Peer endpoint updates (#1893)
## Issue Addressed N/A ## Proposed Changes - rename `address` -> `last_seen_p2p_address` - state and direction filters for `peers` endpoint - metadata count addition to `peers` endpoint - add `peer_count` endpoint Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
parent
fcb4893f72
commit
cb26c15eb6
@ -1353,7 +1353,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
return Ok(api_types::GenericResponse::from(api_types::PeerData {
|
return Ok(api_types::GenericResponse::from(api_types::PeerData {
|
||||||
peer_id: peer_id.to_string(),
|
peer_id: peer_id.to_string(),
|
||||||
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
|
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
|
||||||
address,
|
last_seen_p2p_address: address,
|
||||||
direction: api_types::PeerDirection::from_connection_direction(
|
direction: api_types::PeerDirection::from_connection_direction(
|
||||||
&dir,
|
&dir,
|
||||||
),
|
),
|
||||||
@ -1375,47 +1375,104 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(warp::path("node"))
|
.and(warp::path("node"))
|
||||||
.and(warp::path("peers"))
|
.and(warp::path("peers"))
|
||||||
.and(warp::path::end())
|
.and(warp::path::end())
|
||||||
|
.and(warp::query::<api_types::PeersQuery>())
|
||||||
|
.and(network_globals.clone())
|
||||||
|
.and_then(
|
||||||
|
|query: api_types::PeersQuery, network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
|
||||||
|
blocking_json_task(move || {
|
||||||
|
let mut peers: Vec<api_types::PeerData> = Vec::new();
|
||||||
|
network_globals
|
||||||
|
.peers
|
||||||
|
.read()
|
||||||
|
.peers()
|
||||||
|
.for_each(|(peer_id, peer_info)| {
|
||||||
|
let address =
|
||||||
|
if let Some(socket_addr) = peer_info.seen_addresses.iter().next() {
|
||||||
|
let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip());
|
||||||
|
addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(
|
||||||
|
socket_addr.port(),
|
||||||
|
));
|
||||||
|
addr.to_string()
|
||||||
|
} else if let Some(addr) = peer_info.listening_addresses.first() {
|
||||||
|
addr.to_string()
|
||||||
|
} else {
|
||||||
|
String::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
// the eth2 API spec implies only peers we have been connected to at some point should be included.
|
||||||
|
if let Some(dir) = peer_info.connection_direction.as_ref() {
|
||||||
|
let direction =
|
||||||
|
api_types::PeerDirection::from_connection_direction(&dir);
|
||||||
|
let state = api_types::PeerState::from_peer_connection_status(
|
||||||
|
&peer_info.connection_status(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let state_matches = query.state.as_ref().map_or(true, |states| {
|
||||||
|
states.0.iter().any(|state_param| *state_param == state)
|
||||||
|
});
|
||||||
|
let direction_matches =
|
||||||
|
query.direction.as_ref().map_or(true, |directions| {
|
||||||
|
directions.0.iter().any(|dir_param| *dir_param == direction)
|
||||||
|
});
|
||||||
|
|
||||||
|
if state_matches && direction_matches {
|
||||||
|
peers.push(api_types::PeerData {
|
||||||
|
peer_id: peer_id.to_string(),
|
||||||
|
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
|
||||||
|
last_seen_p2p_address: address,
|
||||||
|
direction,
|
||||||
|
state,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(api_types::PeersData {
|
||||||
|
meta: api_types::PeersMetaData {
|
||||||
|
count: peers.len() as u64,
|
||||||
|
},
|
||||||
|
data: peers,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// GET node/peer_count
|
||||||
|
let get_node_peer_count = eth1_v1
|
||||||
|
.and(warp::path("node"))
|
||||||
|
.and(warp::path("peer_count"))
|
||||||
|
.and(warp::path::end())
|
||||||
.and(network_globals.clone())
|
.and(network_globals.clone())
|
||||||
.and_then(|network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
|
.and_then(|network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
|
||||||
blocking_json_task(move || {
|
blocking_json_task(move || {
|
||||||
let mut peers: Vec<api_types::PeerData> = Vec::new();
|
let mut connected: u64 = 0;
|
||||||
|
let mut connecting: u64 = 0;
|
||||||
|
let mut disconnected: u64 = 0;
|
||||||
|
let mut disconnecting: u64 = 0;
|
||||||
|
|
||||||
network_globals
|
network_globals
|
||||||
.peers
|
.peers
|
||||||
.read()
|
.read()
|
||||||
.peers()
|
.peers()
|
||||||
// the eth2 API spec implies only peers we have been connected to at some point should be included.
|
.for_each(|(_, peer_info)| {
|
||||||
.filter(|(_, peer_info)| peer_info.connection_direction.is_some())
|
let state = api_types::PeerState::from_peer_connection_status(
|
||||||
.for_each(|(peer_id, peer_info)| {
|
&peer_info.connection_status(),
|
||||||
let address = if let Some(socket_addr) =
|
);
|
||||||
peer_info.seen_addresses.iter().next()
|
match state {
|
||||||
{
|
api_types::PeerState::Connected => connected += 1,
|
||||||
let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip());
|
api_types::PeerState::Connecting => connecting += 1,
|
||||||
addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port()));
|
api_types::PeerState::Disconnected => disconnected += 1,
|
||||||
addr.to_string()
|
api_types::PeerState::Disconnecting => disconnecting += 1,
|
||||||
} else if let Some(addr) = peer_info.listening_addresses.first() {
|
|
||||||
addr.to_string()
|
|
||||||
} else {
|
|
||||||
String::new()
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(dir) = peer_info.connection_direction.as_ref() {
|
|
||||||
peers.push(api_types::PeerData {
|
|
||||||
peer_id: peer_id.to_string(),
|
|
||||||
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
|
|
||||||
address,
|
|
||||||
direction: api_types::PeerDirection::from_connection_direction(
|
|
||||||
&dir,
|
|
||||||
),
|
|
||||||
state: api_types::PeerState::from_peer_connection_status(
|
|
||||||
&peer_info.connection_status(),
|
|
||||||
),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(api_types::GenericResponse::from(peers))
|
|
||||||
|
Ok(api_types::GenericResponse::from(api_types::PeerCount {
|
||||||
|
disconnecting,
|
||||||
|
connecting,
|
||||||
|
connected,
|
||||||
|
disconnected,
|
||||||
|
}))
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* validator
|
* validator
|
||||||
*/
|
*/
|
||||||
@ -2076,6 +2133,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.or(get_node_health.boxed())
|
.or(get_node_health.boxed())
|
||||||
.or(get_node_peers_by_id.boxed())
|
.or(get_node_peers_by_id.boxed())
|
||||||
.or(get_node_peers.boxed())
|
.or(get_node_peers.boxed())
|
||||||
|
.or(get_node_peer_count.boxed())
|
||||||
.or(get_validator_duties_proposer.boxed())
|
.or(get_validator_duties_proposer.boxed())
|
||||||
.or(get_validator_blocks.boxed())
|
.or(get_validator_blocks.boxed())
|
||||||
.or(get_validator_attestation_data.boxed())
|
.or(get_validator_attestation_data.boxed())
|
||||||
|
@ -1178,7 +1178,7 @@ impl ApiTester {
|
|||||||
let expected = PeerData {
|
let expected = PeerData {
|
||||||
peer_id: self.external_peer_id.to_string(),
|
peer_id: self.external_peer_id.to_string(),
|
||||||
enr: None,
|
enr: None,
|
||||||
address: EXTERNAL_ADDR.to_string(),
|
last_seen_p2p_address: EXTERNAL_ADDR.to_string(),
|
||||||
state: PeerState::Connected,
|
state: PeerState::Connected,
|
||||||
direction: PeerDirection::Inbound,
|
direction: PeerDirection::Inbound,
|
||||||
};
|
};
|
||||||
@ -1189,18 +1189,66 @@ impl ApiTester {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn test_get_node_peers(self) -> Self {
|
pub async fn test_get_node_peers(self) -> Self {
|
||||||
let result = self.client.get_node_peers().await.unwrap().data;
|
let peer_states: Vec<Option<&[PeerState]>> = vec![
|
||||||
|
Some(&[PeerState::Connected]),
|
||||||
|
Some(&[PeerState::Connecting]),
|
||||||
|
Some(&[PeerState::Disconnected]),
|
||||||
|
Some(&[PeerState::Disconnecting]),
|
||||||
|
None,
|
||||||
|
Some(&[PeerState::Connected, PeerState::Connecting]),
|
||||||
|
];
|
||||||
|
let peer_dirs: Vec<Option<&[PeerDirection]>> = vec![
|
||||||
|
Some(&[PeerDirection::Outbound]),
|
||||||
|
Some(&[PeerDirection::Inbound]),
|
||||||
|
Some(&[PeerDirection::Inbound, PeerDirection::Outbound]),
|
||||||
|
None,
|
||||||
|
];
|
||||||
|
|
||||||
let expected = PeerData {
|
for states in peer_states {
|
||||||
peer_id: self.external_peer_id.to_string(),
|
for dirs in peer_dirs.clone() {
|
||||||
enr: None,
|
let result = self.client.get_node_peers(states, dirs).await.unwrap();
|
||||||
address: EXTERNAL_ADDR.to_string(),
|
let expected_peer = PeerData {
|
||||||
state: PeerState::Connected,
|
peer_id: self.external_peer_id.to_string(),
|
||||||
direction: PeerDirection::Inbound,
|
enr: None,
|
||||||
};
|
last_seen_p2p_address: EXTERNAL_ADDR.to_string(),
|
||||||
|
state: PeerState::Connected,
|
||||||
|
direction: PeerDirection::Inbound,
|
||||||
|
};
|
||||||
|
|
||||||
assert_eq!(result, vec![expected]);
|
let state_match =
|
||||||
|
states.map_or(true, |states| states.contains(&PeerState::Connected));
|
||||||
|
let dir_match = dirs.map_or(true, |dirs| dirs.contains(&PeerDirection::Inbound));
|
||||||
|
|
||||||
|
let mut expected_peers = Vec::new();
|
||||||
|
if state_match && dir_match {
|
||||||
|
expected_peers.push(expected_peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
PeersData {
|
||||||
|
meta: PeersMetaData {
|
||||||
|
count: expected_peers.len() as u64
|
||||||
|
},
|
||||||
|
data: expected_peers,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn test_get_node_peer_count(self) -> Self {
|
||||||
|
let result = self.client.get_node_peer_count().await.unwrap().data;
|
||||||
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
PeerCount {
|
||||||
|
connected: 1,
|
||||||
|
connecting: 0,
|
||||||
|
disconnected: 0,
|
||||||
|
disconnecting: 0,
|
||||||
|
}
|
||||||
|
);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1899,6 +1947,8 @@ async fn node_get() {
|
|||||||
.test_get_node_peers_by_id()
|
.test_get_node_peers_by_id()
|
||||||
.await
|
.await
|
||||||
.test_get_node_peers()
|
.test_get_node_peers()
|
||||||
|
.await
|
||||||
|
.test_get_node_peer_count()
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -722,7 +722,11 @@ impl BeaconNodeHttpClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// `GET node/peers`
|
/// `GET node/peers`
|
||||||
pub async fn get_node_peers(&self) -> Result<GenericResponse<Vec<PeerData>>, Error> {
|
pub async fn get_node_peers(
|
||||||
|
&self,
|
||||||
|
states: Option<&[PeerState]>,
|
||||||
|
directions: Option<&[PeerDirection]>,
|
||||||
|
) -> Result<PeersData, Error> {
|
||||||
let mut path = self.eth_path()?;
|
let mut path = self.eth_path()?;
|
||||||
|
|
||||||
path.path_segments_mut()
|
path.path_segments_mut()
|
||||||
@ -730,6 +734,36 @@ impl BeaconNodeHttpClient {
|
|||||||
.push("node")
|
.push("node")
|
||||||
.push("peers");
|
.push("peers");
|
||||||
|
|
||||||
|
if let Some(states) = states {
|
||||||
|
let state_string = states
|
||||||
|
.iter()
|
||||||
|
.map(|i| i.to_string())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(",");
|
||||||
|
path.query_pairs_mut().append_pair("state", &state_string);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(directions) = directions {
|
||||||
|
let dir_string = directions
|
||||||
|
.iter()
|
||||||
|
.map(|i| i.to_string())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(",");
|
||||||
|
path.query_pairs_mut().append_pair("direction", &dir_string);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.get(path).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `GET node/peer_count`
|
||||||
|
pub async fn get_node_peer_count(&self) -> Result<GenericResponse<PeerCount>, Error> {
|
||||||
|
let mut path = self.eth_path()?;
|
||||||
|
|
||||||
|
path.path_segments_mut()
|
||||||
|
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||||
|
.push("node")
|
||||||
|
.push("peer_count");
|
||||||
|
|
||||||
self.get(path).await
|
self.get(path).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,15 +509,32 @@ pub struct BeaconCommitteeSubscription {
|
|||||||
pub is_aggregator: bool,
|
pub is_aggregator: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct PeersQuery {
|
||||||
|
pub state: Option<QueryVec<PeerState>>,
|
||||||
|
pub direction: Option<QueryVec<PeerDirection>>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
pub struct PeerData {
|
pub struct PeerData {
|
||||||
pub peer_id: String,
|
pub peer_id: String,
|
||||||
pub enr: Option<String>,
|
pub enr: Option<String>,
|
||||||
pub address: String,
|
pub last_seen_p2p_address: String,
|
||||||
pub state: PeerState,
|
pub state: PeerState,
|
||||||
pub direction: PeerDirection,
|
pub direction: PeerDirection,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct PeersData {
|
||||||
|
pub data: Vec<PeerData>,
|
||||||
|
pub meta: PeersMetaData,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct PeersMetaData {
|
||||||
|
pub count: u64,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "lowercase")]
|
#[serde(rename_all = "lowercase")]
|
||||||
pub enum PeerState {
|
pub enum PeerState {
|
||||||
@ -554,6 +571,17 @@ impl FromStr for PeerState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for PeerState {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
PeerState::Connected => write!(f, "connected"),
|
||||||
|
PeerState::Connecting => write!(f, "connecting"),
|
||||||
|
PeerState::Disconnected => write!(f, "disconnected"),
|
||||||
|
PeerState::Disconnecting => write!(f, "disconnecting"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "lowercase")]
|
#[serde(rename_all = "lowercase")]
|
||||||
pub enum PeerDirection {
|
pub enum PeerDirection {
|
||||||
@ -582,6 +610,27 @@ impl FromStr for PeerDirection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for PeerDirection {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
PeerDirection::Inbound => write!(f, "inbound"),
|
||||||
|
PeerDirection::Outbound => write!(f, "outbound"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct PeerCount {
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub connected: u64,
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub connecting: u64,
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub disconnected: u64,
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub disconnecting: u64,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
Loading…
Reference in New Issue
Block a user