From a3552a4b7003066e350db89a12ab32fcb504074b Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 22 Oct 2020 02:59:42 +0000 Subject: [PATCH] Node endpoints (#1778) ## Issue Addressed `node` endpoints in #1434 ## Proposed Changes Implement these: ``` /eth/v1/node/health /eth/v1/node/peers/{peer_id} /eth/v1/node/peers ``` - Add an `Option` to `PeerInfo` - Finish implementation of `/eth/v1/node/identity` ## Additional Info - should update the `peers` endpoints when #1764 is resolved Co-authored-by: realbigsean --- Cargo.lock | 1 + .../eth2_libp2p/src/discovery/enr_ext.rs | 60 ++++++- beacon_node/eth2_libp2p/src/discovery/mod.rs | 2 +- beacon_node/eth2_libp2p/src/lib.rs | 3 +- .../eth2_libp2p/src/peer_manager/mod.rs | 10 +- .../eth2_libp2p/src/peer_manager/peer_info.rs | 4 + .../eth2_libp2p/src/peer_manager/peerdb.rs | 60 +++---- beacon_node/http_api/Cargo.toml | 1 + beacon_node/http_api/src/lib.rs | 149 +++++++++++++++++- beacon_node/http_api/tests/tests.rs | 100 +++++++++++- common/eth2/src/lib.rs | 64 ++++++++ common/eth2/src/types.rs | 89 ++++++++++- 12 files changed, 493 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4d7a413f..4a974b18b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2455,6 +2455,7 @@ name = "http_api" version = "0.1.0" dependencies = [ "beacon_chain", + "bs58", "discv5", "environment", "eth1", diff --git a/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs b/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs index c0df6b16f..9e6e409ae 100644 --- a/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs +++ b/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs @@ -13,9 +13,15 @@ pub trait EnrExt { /// The vector remains empty if these fields are not defined. fn multiaddr(&self) -> Vec; - /// Returns the multiaddr with the `PeerId` prepended. + /// Returns a list of multiaddrs with the `PeerId` prepended. fn multiaddr_p2p(&self) -> Vec; + /// Returns any multiaddrs that contain the TCP protocol with the `PeerId` prepended. + fn multiaddr_p2p_tcp(&self) -> Vec; + + /// Returns any multiaddrs that contain the UDP protocol with the `PeerId` prepended. + fn multiaddr_p2p_udp(&self) -> Vec; + /// Returns any multiaddrs that contain the TCP protocol. fn multiaddr_tcp(&self) -> Vec; } @@ -111,6 +117,58 @@ impl EnrExt for Enr { multiaddrs } + /// Returns a list of multiaddrs if the ENR has an `ip` and a `tcp` key **or** an `ip6` and a `tcp6`. + /// The vector remains empty if these fields are not defined. + /// + /// This also prepends the `PeerId` into each multiaddr with the `P2p` protocol. + fn multiaddr_p2p_tcp(&self) -> Vec { + let peer_id = self.peer_id(); + let mut multiaddrs: Vec = Vec::new(); + if let Some(ip) = self.ip() { + if let Some(tcp) = self.tcp() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Tcp(tcp)); + multiaddr.push(Protocol::P2p(peer_id.clone().into())); + multiaddrs.push(multiaddr); + } + } + if let Some(ip6) = self.ip6() { + if let Some(tcp6) = self.tcp6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Tcp(tcp6)); + multiaddr.push(Protocol::P2p(peer_id.into())); + multiaddrs.push(multiaddr); + } + } + multiaddrs + } + + /// Returns a list of multiaddrs if the ENR has an `ip` and a `udp` key **or** an `ip6` and a `udp6`. + /// The vector remains empty if these fields are not defined. + /// + /// This also prepends the `PeerId` into each multiaddr with the `P2p` protocol. + fn multiaddr_p2p_udp(&self) -> Vec { + let peer_id = self.peer_id(); + let mut multiaddrs: Vec = Vec::new(); + if let Some(ip) = self.ip() { + if let Some(udp) = self.udp() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Udp(udp)); + multiaddr.push(Protocol::P2p(peer_id.clone().into())); + multiaddrs.push(multiaddr); + } + } + if let Some(ip6) = self.ip6() { + if let Some(udp6) = self.udp6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Udp(udp6)); + multiaddr.push(Protocol::P2p(peer_id.into())); + multiaddrs.push(multiaddr); + } + } + multiaddrs + } + /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. /// The vector remains empty if these fields are not defined. fn multiaddr_tcp(&self) -> Vec { diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 8c29d844c..5e2ee3675 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -5,7 +5,7 @@ pub mod enr_ext; // Allow external use of the lighthouse ENR builder pub use enr::{build_enr, create_enr_builder_from_config, use_or_load_enr, CombinedKey, Eth2Enr}; pub use enr_ext::{peer_id_to_node_id, CombinedKeyExt, EnrExt}; -pub use libp2p::core::identity::Keypair; +pub use libp2p::core::identity::{Keypair, PublicKey}; use crate::metrics; use crate::{error, Enr, NetworkConfig, NetworkGlobals, SubnetDiscovery}; diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index f5a989b73..6d5526d97 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -69,6 +69,7 @@ pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; pub use metrics::scrape_discovery_metrics; pub use peer_manager::{ - client::Client, score::PeerAction, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo, + client::Client, score::PeerAction, ConnectionDirection, PeerConnectionStatus, PeerDB, PeerInfo, + PeerSyncStatus, SyncInfo, }; pub use service::{load_private_key, Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index dc8786f81..753dc3984 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -29,7 +29,7 @@ mod peer_sync_status; mod peerdb; pub(crate) mod score; -pub use peer_info::{PeerConnectionStatus::*, PeerInfo}; +pub use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo}; pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; use score::{PeerAction, ScoreState}; use std::collections::HashMap; @@ -623,16 +623,18 @@ impl PeerManager { slog::crit!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => peer_id.to_string()); } + let enr = self.discovery.enr_of_peer(peer_id); + match connection { ConnectingType::Dialing => { - peerdb.dialing_peer(peer_id); + peerdb.dialing_peer(peer_id, enr); return true; } ConnectingType::IngoingConnected { multiaddr } => { - peerdb.connect_outgoing(peer_id, multiaddr) + peerdb.connect_outgoing(peer_id, multiaddr, enr) } ConnectingType::OutgoingConnected { multiaddr } => { - peerdb.connect_ingoing(peer_id, multiaddr) + peerdb.connect_ingoing(peer_id, multiaddr, enr) } } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 71025ee52..c97ec8ec6 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -3,6 +3,7 @@ use super::score::{PeerAction, Score, ScoreState}; use super::PeerSyncStatus; use crate::rpc::MetaData; use crate::Multiaddr; +use discv5::Enr; use serde::{ ser::{SerializeStruct, Serializer}, Serialize, @@ -46,6 +47,8 @@ pub struct PeerInfo { /// Direction of the first connection of the last (or current) connected session with this peer. /// None if this peer was never connected. pub connection_direction: Option, + /// The enr of the peer, if known. + pub enr: Option, } impl Default for PeerInfo { @@ -62,6 +65,7 @@ impl Default for PeerInfo { min_ttl: None, is_trusted: false, connection_direction: None, + enr: None, } } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 7cbcf92fe..e3d8d2a0e 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -3,6 +3,7 @@ use super::peer_sync_status::PeerSyncStatus; use super::score::{Score, ScoreState}; use crate::multiaddr::{Multiaddr, Protocol}; use crate::rpc::methods::MetaData; +use crate::Enr; use crate::PeerId; use rand::seq::SliceRandom; use slog::{crit, debug, trace, warn}; @@ -191,7 +192,7 @@ impl PeerDB { } } - /// Gives the ids of all known connected peers. + /// Gives the ids and info of all known connected peers. pub fn connected_peers(&self) -> impl Iterator)> { self.peers .iter() @@ -318,8 +319,9 @@ impl PeerDB { /* Setters */ /// A peer is being dialed. - pub fn dialing_peer(&mut self, peer_id: &PeerId) { + pub fn dialing_peer(&mut self, peer_id: &PeerId, enr: Option) { let info = self.peers.entry(peer_id.clone()).or_default(); + info.enr = enr; if info.connection_status.is_disconnected() { self.disconnected_peers = self.disconnected_peers.saturating_sub(1); @@ -370,8 +372,9 @@ impl PeerDB { } /// Sets a peer as connected with an ingoing connection. - pub fn connect_ingoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr) { + pub fn connect_ingoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { let info = self.peers.entry(peer_id.clone()).or_default(); + info.enr = enr; if info.connection_status.is_disconnected() { self.disconnected_peers = self.disconnected_peers.saturating_sub(1); @@ -391,8 +394,9 @@ impl PeerDB { } /// Sets a peer as connected with an outgoing connection. - pub fn connect_outgoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr) { + pub fn connect_outgoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { let info = self.peers.entry(peer_id.clone()).or_default(); + info.enr = enr; if info.connection_status.is_disconnected() { self.disconnected_peers = self.disconnected_peers.saturating_sub(1); @@ -578,10 +582,10 @@ mod tests { let (n_in, n_out) = (10, 20); for _ in 0..n_in { - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); } for _ in 0..n_out { - pdb.connect_outgoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_outgoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); } // the peer is known @@ -606,7 +610,7 @@ mod tests { for _ in 0..MAX_DC_PEERS + 1 { let p = PeerId::random(); - pdb.connect_ingoing(&p, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&p, "/ip4/0.0.0.0".parse().unwrap(), None); } assert_eq!(pdb.disconnected_peers, 0); @@ -623,7 +627,7 @@ mod tests { for _ in 0..MAX_BANNED_PEERS + 1 { let p = PeerId::random(); - pdb.connect_ingoing(&p, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&p, "/ip4/0.0.0.0".parse().unwrap(), None); } assert_eq!(pdb.banned_peers_count.banned_peers(), 0); @@ -641,9 +645,9 @@ mod tests { let p0 = PeerId::random(); let p1 = PeerId::random(); let p2 = PeerId::random(); - pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap()); - pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap()); - pdb.connect_ingoing(&p2, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&p2, "/ip4/0.0.0.0".parse().unwrap(), None); add_score(&mut pdb, &p0, 70.0); add_score(&mut pdb, &p1, 100.0); add_score(&mut pdb, &p2, 50.0); @@ -663,9 +667,9 @@ mod tests { let p0 = PeerId::random(); let p1 = PeerId::random(); let p2 = PeerId::random(); - pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap()); - pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap()); - pdb.connect_ingoing(&p2, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&p2, "/ip4/0.0.0.0".parse().unwrap(), None); add_score(&mut pdb, &p0, 70.0); add_score(&mut pdb, &p1, 100.0); add_score(&mut pdb, &p2, 50.0); @@ -683,18 +687,18 @@ mod tests { let random_peer = PeerId::random(); - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); dbg!("1"); - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); dbg!("1"); pdb.disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); dbg!("1"); - pdb.connect_outgoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_outgoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); dbg!("1"); pdb.disconnect(&random_peer); @@ -725,20 +729,20 @@ mod tests { let random_peer2 = PeerId::random(); let random_peer3 = PeerId::random(); - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap()); - pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap()); - pdb.connect_ingoing(&random_peer2, "/ip4/0.0.0.0".parse().unwrap()); - pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&random_peer2, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), pdb.banned_peers().count() ); - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.disconnect(&random_peer1); pdb.ban(&random_peer2); - pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), @@ -751,7 +755,7 @@ mod tests { pdb.banned_peers().count() ); - pdb.connect_outgoing(&random_peer2, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_outgoing(&random_peer2, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), @@ -765,10 +769,10 @@ mod tests { ); pdb.ban(&random_peer3); - pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.disconnect(&random_peer2); pdb.ban(&random_peer3); - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), @@ -797,7 +801,7 @@ mod tests { for ip in ips { let mut addr = Multiaddr::empty(); addr.push(Protocol::from(ip)); - pdb.connect_ingoing(&p, addr); + pdb.connect_ingoing(&p, addr, None); } p } @@ -956,7 +960,7 @@ mod tests { let log = build_log(slog::Level::Debug, false); let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer.clone()], &log); - pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap()); + pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Check trusted status and score assert!(pdb.peer_info(&trusted_peer).unwrap().is_trusted); diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index af2ebfeca..608864a9d 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -24,6 +24,7 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lazy_static = "1.4.0" warp_utils = { path = "../../common/warp_utils" } slot_clock = { path = "../../common/slot_clock" } +bs58 = "0.3.1" [dev-dependencies] store = { path = "../store" } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 28cb38a24..c608821b1 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -21,7 +21,7 @@ use eth2::{ types::{self as api_types, ValidatorId}, StatusCode, }; -use eth2_libp2p::{types::SyncState, NetworkGlobals, PubsubMessage}; +use eth2_libp2p::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; use network::NetworkMessage; use parking_lot::Mutex; @@ -109,7 +109,11 @@ pub fn slog_logging( ) -> warp::filters::log::Log { warp::log::custom(move |info| { match info.status() { - status if status == StatusCode::OK || status == StatusCode::NOT_FOUND => { + status + if status == StatusCode::OK + || status == StatusCode::NOT_FOUND + || status == StatusCode::PARTIAL_CONTENT => + { trace!( log, "Processed HTTP API request"; @@ -1106,10 +1110,28 @@ pub fn serve( .and(network_globals.clone()) .and_then(|network_globals: Arc>| { blocking_json_task(move || { + let enr = network_globals.local_enr(); + let p2p_addresses = enr.multiaddr_p2p_tcp(); + let discovery_addresses = enr.multiaddr_p2p_udp(); Ok(api_types::GenericResponse::from(api_types::IdentityData { peer_id: network_globals.local_peer_id().to_base58(), - enr: network_globals.local_enr(), - p2p_addresses: network_globals.listen_multiaddrs(), + enr, + p2p_addresses, + discovery_addresses, + metadata: api_types::MetaData { + seq_number: network_globals.local_metadata.read().seq_number, + attnets: format!( + "0x{}", + hex::encode( + network_globals + .local_metadata + .read() + .attnets + .clone() + .into_bytes() + ), + ), + }, })) }) }); @@ -1159,6 +1181,122 @@ pub fn serve( }, ); + // GET node/health + let get_node_health = eth1_v1 + .and(warp::path("node")) + .and(warp::path("health")) + .and(warp::path::end()) + .and(network_globals.clone()) + .and_then(|network_globals: Arc>| { + blocking_task(move || match *network_globals.sync_state.read() { + SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } => { + Ok(warp::reply::with_status( + warp::reply(), + warp::http::StatusCode::PARTIAL_CONTENT, + )) + } + SyncState::Synced => Ok(warp::reply::with_status( + warp::reply(), + warp::http::StatusCode::OK, + )), + SyncState::Stalled => Err(warp_utils::reject::not_synced( + "sync stalled, beacon chain may not yet be initialized.".to_string(), + )), + }) + }); + + // GET node/peers/{peer_id} + let get_node_peers_by_id = eth1_v1 + .and(warp::path("node")) + .and(warp::path("peers")) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(network_globals.clone()) + .and_then( + |requested_peer_id: String, network_globals: Arc>| { + blocking_json_task(move || { + let peer_id = PeerId::from_bytes( + bs58::decode(requested_peer_id.as_str()) + .into_vec() + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid peer id: {}", + e + )) + })?, + ) + .map_err(|_| { + warp_utils::reject::custom_bad_request("invalid peer id.".to_string()) + })?; + + if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) { + //TODO: update this to seen_addresses once #1764 is resolved + let address = match peer_info.listening_addresses.get(0) { + Some(addr) => addr.to_string(), + None => "".to_string(), // this field is non-nullable in the eth2 API spec + }; + + // the eth2 API spec implies only peers we have been connected to at some point should be included. + if let Some(dir) = peer_info.connection_direction.as_ref() { + return Ok(api_types::GenericResponse::from(api_types::PeerData { + peer_id: peer_id.to_string(), + enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), + last_seen_p2p_address: address, + direction: api_types::PeerDirection::from_connection_direction( + &dir, + ), + state: api_types::PeerState::from_peer_connection_status( + &peer_info.connection_status, + ), + })); + } + } + Err(warp_utils::reject::custom_not_found( + "peer not found.".to_string(), + )) + }) + }, + ); + + // GET node/peers + let get_node_peers = eth1_v1 + .and(warp::path("node")) + .and(warp::path("peers")) + .and(warp::path::end()) + .and(network_globals.clone()) + .and_then(|network_globals: Arc>| { + blocking_json_task(move || { + let mut peers: Vec = Vec::new(); + network_globals + .peers + .read() + .peers() + // the eth2 API spec implies only peers we have been connected to at some point should be included. + .filter(|(_, peer_info)| peer_info.connection_direction.is_some()) + .for_each(|(peer_id, peer_info)| { + //TODO: update this to seen_addresses once #1764 is resolved + let address = match peer_info.listening_addresses.get(0) { + Some(addr) => addr.to_string(), + None => "".to_string(), // this field is non-nullable in the eth2 API spec + }; + if let Some(dir) = peer_info.connection_direction.as_ref() { + peers.push(api_types::PeerData { + peer_id: peer_id.to_string(), + enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), + last_seen_p2p_address: address, + direction: api_types::PeerDirection::from_connection_direction( + &dir, + ), + state: api_types::PeerState::from_peer_connection_status( + &peer_info.connection_status, + ), + }); + } + }); + Ok(api_types::GenericResponse::from(peers)) + }) + }); + /* * validator */ @@ -1653,6 +1791,9 @@ pub fn serve( .or(get_node_identity.boxed()) .or(get_node_version.boxed()) .or(get_node_syncing.boxed()) + .or(get_node_health.boxed()) + .or(get_node_peers_by_id.boxed()) + .or(get_node_peers.boxed()) .or(get_validator_duties_attester.boxed()) .or(get_validator_duties_proposer.boxed()) .or(get_validator_blocks.boxed()) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index a161ce526..adcd0ae5e 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -8,7 +8,7 @@ use eth2::{types::*, BeaconNodeHttpClient, Url}; use eth2_libp2p::{ rpc::methods::MetaData, types::{EnrBitfield, SyncState}, - NetworkGlobals, + Enr, EnrExt, NetworkGlobals, PeerId, }; use http_api::{Config, Context}; use network::NetworkMessage; @@ -23,6 +23,7 @@ use types::{ test_utils::generate_deterministic_keypairs, AggregateSignature, BeaconState, BitList, Domain, EthSpec, Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, Slot, }; +use warp::http::StatusCode; type E = MainnetEthSpec; @@ -31,6 +32,10 @@ const VALIDATOR_COUNT: usize = SLOTS_PER_EPOCH as usize; const CHAIN_LENGTH: u64 = SLOTS_PER_EPOCH * 5; const JUSTIFIED_EPOCH: u64 = 4; const FINALIZED_EPOCH: u64 = 3; +const TCP_PORT: u16 = 42; +const UDP_PORT: u16 = 42; +const SEQ_NUMBER: u64 = 0; +const EXTERNAL_ADDR: &str = "/ip4/0.0.0.0"; /// Skipping the slots around the epoch boundary allows us to check that we're obtaining states /// from skipped slots for the finalized and justified checkpoints (instead of the state from the @@ -53,6 +58,8 @@ struct ApiTester { _server_shutdown: oneshot::Sender<()>, validator_keypairs: Vec, network_rx: mpsc::UnboundedReceiver>, + local_enr: Enr, + external_peer_id: PeerId, } impl ApiTester { @@ -139,12 +146,24 @@ impl ApiTester { // Default metadata let meta_data = MetaData { - seq_number: 0, - attnets: EnrBitfield::::default(), + seq_number: SEQ_NUMBER, + attnets: EnrBitfield::::default(), }; let enr_key = CombinedKey::generate_secp256k1(); let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); - let network_globals = NetworkGlobals::new(enr, 42, 42, meta_data, vec![], &log); + let enr_clone = enr.clone(); + let network_globals = NetworkGlobals::new(enr, TCP_PORT, UDP_PORT, meta_data, vec![], &log); + + let peer_id = PeerId::random(); + network_globals.peers.write().connect_ingoing( + &peer_id, + EXTERNAL_ADDR.parse().unwrap(), + None, + ); + //TODO: have to update this once #1764 is resolved + if let Some(peer_info) = network_globals.peers.write().peer_info_mut(&peer_id) { + peer_info.listening_addresses = vec![EXTERNAL_ADDR.parse().unwrap()]; + } *network_globals.sync_state.write() = SyncState::Synced; @@ -190,6 +209,8 @@ impl ApiTester { _server_shutdown: shutdown_tx, validator_keypairs: harness.validator_keypairs, network_rx, + local_enr: enr_clone, + external_peer_id: peer_id, } } @@ -1004,6 +1025,69 @@ impl ApiTester { self } + pub async fn test_get_node_identity(self) -> Self { + let result = self.client.get_node_identity().await.unwrap().data; + + let expected = IdentityData { + peer_id: self.local_enr.peer_id().to_string(), + enr: self.local_enr.clone(), + p2p_addresses: self.local_enr.multiaddr_p2p_tcp(), + discovery_addresses: self.local_enr.multiaddr_p2p_udp(), + metadata: eth2::types::MetaData { + seq_number: 0, + attnets: "0x0000000000000000".to_string(), + }, + }; + + assert_eq!(result, expected); + + self + } + + pub async fn test_get_node_health(self) -> Self { + let status = self.client.get_node_health().await.unwrap(); + assert_eq!(status, StatusCode::OK); + + self + } + + pub async fn test_get_node_peers_by_id(self) -> Self { + let result = self + .client + .get_node_peers_by_id(self.external_peer_id.clone()) + .await + .unwrap() + .data; + + let expected = PeerData { + peer_id: self.external_peer_id.to_string(), + enr: None, + last_seen_p2p_address: EXTERNAL_ADDR.to_string(), + state: PeerState::Connected, + direction: PeerDirection::Inbound, + }; + + assert_eq!(result, expected); + + self + } + + pub async fn test_get_node_peers(self) -> Self { + let result = self.client.get_node_peers().await.unwrap().data; + + let expected = PeerData { + peer_id: self.external_peer_id.to_string(), + enr: None, + last_seen_p2p_address: EXTERNAL_ADDR.to_string(), + state: PeerState::Connected, + direction: PeerDirection::Inbound, + }; + + assert_eq!(result, vec![expected]); + + self + } + pub async fn test_get_debug_beacon_states(self) -> Self { for state_id in self.interesting_state_ids() { let result = self @@ -1660,6 +1744,14 @@ async fn node_get() { .test_get_node_version() .await .test_get_node_syncing() + .await + .test_get_node_identity() + .await + .test_get_node_health() + .await + .test_get_node_peers_by_id() + .await + .test_get_node_peers() .await; } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 8e9a18b47..9c1019b55 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -18,6 +18,7 @@ use serde::{de::DeserializeOwned, Serialize}; use std::convert::TryFrom; use std::fmt; +use eth2_libp2p::PeerId; pub use reqwest; pub use reqwest::{StatusCode, Url}; @@ -582,6 +583,18 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `GET node/identity` + pub async fn get_node_identity(&self) -> Result, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("node") + .push("identity"); + + self.get(path).await + } + /// `GET node/syncing` pub async fn get_node_syncing(&self) -> Result, Error> { let mut path = self.eth_path()?; @@ -594,6 +607,57 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `GET node/health` + pub async fn get_node_health(&self) -> Result { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("node") + .push("health"); + + let status = self + .client + .get(path) + .send() + .await + .map_err(Error::Reqwest)? + .status(); + if status == StatusCode::OK || status == StatusCode::PARTIAL_CONTENT { + Ok(status) + } else { + Err(Error::StatusCode(status)) + } + } + + /// `GET node/peers/{peer_id}` + pub async fn get_node_peers_by_id( + &self, + peer_id: PeerId, + ) -> Result, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("node") + .push("peers") + .push(&peer_id.to_string()); + + self.get(path).await + } + + /// `GET node/peers` + pub async fn get_node_peers(&self) -> Result>, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("node") + .push("peers"); + + self.get(path).await + } + /// `GET debug/beacon/states/{state_id}` pub async fn get_debug_beacon_states( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index c3a8d240c..03ff1b892 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1,7 +1,7 @@ //! This module exposes a superset of the `types` crate. It adds additional types that are only //! required for the HTTP API. -use eth2_libp2p::{Enr, Multiaddr}; +use eth2_libp2p::{ConnectionDirection, Enr, Multiaddr, PeerConnectionStatus}; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::fmt; @@ -321,12 +321,15 @@ pub struct IdentityData { pub peer_id: String, pub enr: Enr, pub p2p_addresses: Vec, - // TODO: missing the following fields: - // - // - discovery_addresses - // - metadata - // - // Tracked here: https://github.com/sigp/lighthouse/issues/1434 + pub discovery_addresses: Vec, + pub metadata: MetaData, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct MetaData { + #[serde(with = "serde_utils::quoted_u64")] + pub seq_number: u64, + pub attnets: String, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -418,6 +421,78 @@ pub struct BeaconCommitteeSubscription { pub is_aggregator: bool, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct PeerData { + pub peer_id: String, + pub enr: Option, + pub last_seen_p2p_address: String, + pub state: PeerState, + pub direction: PeerDirection, +} + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum PeerState { + Connected, + Connecting, + Disconnected, + Disconnecting, +} + +impl PeerState { + pub fn from_peer_connection_status(status: &PeerConnectionStatus) -> Self { + match status { + PeerConnectionStatus::Connected { .. } => PeerState::Connected, + PeerConnectionStatus::Dialing { .. } => PeerState::Connecting, + PeerConnectionStatus::Disconnected { .. } + | PeerConnectionStatus::Banned { .. } + | PeerConnectionStatus::Unknown => PeerState::Disconnected, + } + } +} + +impl FromStr for PeerState { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "connected" => Ok(PeerState::Connected), + "connecting" => Ok(PeerState::Connecting), + "disconnected" => Ok(PeerState::Disconnected), + "disconnecting" => Ok(PeerState::Disconnecting), + _ => Err("peer state cannot be parsed.".to_string()), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum PeerDirection { + Inbound, + Outbound, +} + +impl PeerDirection { + pub fn from_connection_direction(direction: &ConnectionDirection) -> Self { + match direction { + ConnectionDirection::Incoming => PeerDirection::Inbound, + ConnectionDirection::Outgoing => PeerDirection::Outbound, + } + } +} + +impl FromStr for PeerDirection { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "inbound" => Ok(PeerDirection::Inbound), + "outbound" => Ok(PeerDirection::Outbound), + _ => Err("peer direction cannot be parsed.".to_string()), + } + } +} + #[cfg(test)] mod tests { use super::*;