Shift metadata to the global network variables (#1631)
## Issue Addressed N/A ## Proposed Changes Shifts the local `metadata` to `network_globals` making it accessible to the HTTP API and other areas of lighthouse. ## Additional Info N/A
This commit is contained in:
parent
7b97c4ad30
commit
1db8daae0c
@ -1,6 +1,7 @@
|
|||||||
use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent};
|
use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent};
|
||||||
use crate::rpc::*;
|
use crate::rpc::*;
|
||||||
use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery};
|
use crate::service::METADATA_FILENAME;
|
||||||
|
use crate::types::{GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery};
|
||||||
use crate::Eth2Enr;
|
use crate::Eth2Enr;
|
||||||
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
|
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
@ -23,9 +24,9 @@ use libp2p::{
|
|||||||
PeerId,
|
PeerId,
|
||||||
};
|
};
|
||||||
use slog::{crit, debug, o, trace, warn};
|
use slog::{crit, debug, o, trace, warn};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::Encode;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{Read, Write};
|
use std::io::Write;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
@ -38,7 +39,6 @@ use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId};
|
|||||||
mod handler;
|
mod handler;
|
||||||
|
|
||||||
const MAX_IDENTIFY_ADDRESSES: usize = 10;
|
const MAX_IDENTIFY_ADDRESSES: usize = 10;
|
||||||
const METADATA_FILENAME: &str = "metadata";
|
|
||||||
|
|
||||||
/// Builds the network behaviour that manages the core protocols of eth2.
|
/// Builds the network behaviour that manages the core protocols of eth2.
|
||||||
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
||||||
@ -58,8 +58,6 @@ pub struct Behaviour<TSpec: EthSpec> {
|
|||||||
events: VecDeque<BehaviourEvent<TSpec>>,
|
events: VecDeque<BehaviourEvent<TSpec>>,
|
||||||
/// Queue of peers to disconnect and an optional reason for the disconnection.
|
/// Queue of peers to disconnect and an optional reason for the disconnection.
|
||||||
peers_to_dc: VecDeque<(PeerId, Option<GoodbyeReason>)>,
|
peers_to_dc: VecDeque<(PeerId, Option<GoodbyeReason>)>,
|
||||||
/// The current meta data of the node, so respond to pings and get metadata
|
|
||||||
meta_data: MetaData<TSpec>,
|
|
||||||
/// A collections of variables accessible outside the network service.
|
/// A collections of variables accessible outside the network service.
|
||||||
network_globals: Arc<NetworkGlobals<TSpec>>,
|
network_globals: Arc<NetworkGlobals<TSpec>>,
|
||||||
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
|
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
|
||||||
@ -95,8 +93,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
.eth2()
|
.eth2()
|
||||||
.expect("Local ENR must have a fork id");
|
.expect("Local ENR must have a fork id");
|
||||||
|
|
||||||
let meta_data = load_or_build_metadata(&net_conf.network_dir, &log);
|
|
||||||
|
|
||||||
let gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, net_conf.gs_config.clone())
|
let gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, net_conf.gs_config.clone())
|
||||||
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;
|
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;
|
||||||
|
|
||||||
@ -115,7 +111,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
.await?,
|
.await?,
|
||||||
events: VecDeque::new(),
|
events: VecDeque::new(),
|
||||||
peers_to_dc: VecDeque::new(),
|
peers_to_dc: VecDeque::new(),
|
||||||
meta_data,
|
|
||||||
network_globals,
|
network_globals,
|
||||||
enr_fork_id,
|
enr_fork_id,
|
||||||
waker: None,
|
waker: None,
|
||||||
@ -407,21 +402,31 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
|
|
||||||
/// Updates the current meta data of the node to match the local ENR.
|
/// Updates the current meta data of the node to match the local ENR.
|
||||||
fn update_metadata(&mut self) {
|
fn update_metadata(&mut self) {
|
||||||
self.meta_data.seq_number += 1;
|
let local_attnets = self
|
||||||
self.meta_data.attnets = self
|
|
||||||
.peer_manager
|
.peer_manager
|
||||||
.discovery()
|
.discovery()
|
||||||
.local_enr()
|
.local_enr()
|
||||||
.bitfield::<TSpec>()
|
.bitfield::<TSpec>()
|
||||||
.expect("Local discovery must have bitfield");
|
.expect("Local discovery must have bitfield");
|
||||||
|
|
||||||
|
{
|
||||||
|
// write lock scope
|
||||||
|
let mut meta_data = self.network_globals.local_metadata.write();
|
||||||
|
meta_data.seq_number += 1;
|
||||||
|
meta_data.attnets = local_attnets;
|
||||||
|
}
|
||||||
// Save the updated metadata to disk
|
// Save the updated metadata to disk
|
||||||
save_metadata_to_disk(&self.network_dir, self.meta_data.clone(), &self.log);
|
save_metadata_to_disk(
|
||||||
|
&self.network_dir,
|
||||||
|
self.network_globals.local_metadata.read().clone(),
|
||||||
|
&self.log,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a Ping request to the peer.
|
/// Sends a Ping request to the peer.
|
||||||
fn ping(&mut self, id: RequestId, peer_id: PeerId) {
|
fn ping(&mut self, id: RequestId, peer_id: PeerId) {
|
||||||
let ping = crate::rpc::Ping {
|
let ping = crate::rpc::Ping {
|
||||||
data: self.meta_data.seq_number,
|
data: self.network_globals.local_metadata.read().seq_number,
|
||||||
};
|
};
|
||||||
trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string());
|
trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string());
|
||||||
|
|
||||||
@ -432,7 +437,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
/// Sends a Pong response to the peer.
|
/// Sends a Pong response to the peer.
|
||||||
fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
||||||
let ping = crate::rpc::Ping {
|
let ping = crate::rpc::Ping {
|
||||||
data: self.meta_data.seq_number,
|
data: self.network_globals.local_metadata.read().seq_number,
|
||||||
};
|
};
|
||||||
trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => peer_id.to_string());
|
trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => peer_id.to_string());
|
||||||
let event = RPCCodedResponse::Success(RPCResponse::Pong(ping));
|
let event = RPCCodedResponse::Success(RPCResponse::Pong(ping));
|
||||||
@ -448,7 +453,9 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
|
|
||||||
/// Sends a METADATA response to a peer.
|
/// Sends a METADATA response to a peer.
|
||||||
fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
||||||
let event = RPCCodedResponse::Success(RPCResponse::MetaData(self.meta_data.clone()));
|
let event = RPCCodedResponse::Success(RPCResponse::MetaData(
|
||||||
|
self.network_globals.local_metadata.read().clone(),
|
||||||
|
));
|
||||||
self.eth2_rpc.send_response(peer_id, id, event);
|
self.eth2_rpc.send_response(peer_id, id, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1107,45 +1114,8 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
|
|||||||
StatusPeer(PeerId),
|
StatusPeer(PeerId),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load metadata from persisted file. Return default metadata if loading fails.
|
|
||||||
fn load_or_build_metadata<E: EthSpec>(network_dir: &PathBuf, log: &slog::Logger) -> MetaData<E> {
|
|
||||||
// Default metadata
|
|
||||||
let mut meta_data = MetaData {
|
|
||||||
seq_number: 0,
|
|
||||||
attnets: EnrBitfield::<E>::default(),
|
|
||||||
};
|
|
||||||
// Read metadata from persisted file if available
|
|
||||||
let metadata_path = network_dir.join(METADATA_FILENAME);
|
|
||||||
if let Ok(mut metadata_file) = File::open(metadata_path) {
|
|
||||||
let mut metadata_ssz = Vec::new();
|
|
||||||
if metadata_file.read_to_end(&mut metadata_ssz).is_ok() {
|
|
||||||
match MetaData::<E>::from_ssz_bytes(&metadata_ssz) {
|
|
||||||
Ok(persisted_metadata) => {
|
|
||||||
meta_data.seq_number = persisted_metadata.seq_number;
|
|
||||||
// Increment seq number if persisted attnet is not default
|
|
||||||
if persisted_metadata.attnets != meta_data.attnets {
|
|
||||||
meta_data.seq_number += 1;
|
|
||||||
}
|
|
||||||
debug!(log, "Loaded metadata from disk");
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
debug!(
|
|
||||||
log,
|
|
||||||
"Metadata from file could not be decoded";
|
|
||||||
"error" => format!("{:?}", e),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!(log, "Metadata sequence number"; "seq_num" => meta_data.seq_number);
|
|
||||||
save_metadata_to_disk(network_dir, meta_data.clone(), &log);
|
|
||||||
meta_data
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Persist metadata to disk
|
/// Persist metadata to disk
|
||||||
fn save_metadata_to_disk<E: EthSpec>(dir: &PathBuf, metadata: MetaData<E>, log: &slog::Logger) {
|
pub fn save_metadata_to_disk<E: EthSpec>(dir: &PathBuf, metadata: MetaData<E>, log: &slog::Logger) {
|
||||||
let _ = std::fs::create_dir_all(&dir);
|
let _ = std::fs::create_dir_all(&dir);
|
||||||
match File::create(dir.join(METADATA_FILENAME))
|
match File::create(dir.join(METADATA_FILENAME))
|
||||||
.and_then(|mut f| f.write_all(&metadata.as_ssz_bytes()))
|
.and_then(|mut f| f.write_all(&metadata.as_ssz_bytes()))
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
use crate::behaviour::{Behaviour, BehaviourEvent, PeerRequestId, Request, Response};
|
use crate::behaviour::{
|
||||||
|
save_metadata_to_disk, Behaviour, BehaviourEvent, PeerRequestId, Request, Response,
|
||||||
|
};
|
||||||
use crate::discovery::enr;
|
use crate::discovery::enr;
|
||||||
use crate::multiaddr::Protocol;
|
use crate::multiaddr::Protocol;
|
||||||
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId};
|
use crate::rpc::{GoodbyeReason, MetaData, RPCResponseErrorCode, RequestId};
|
||||||
use crate::types::{error, GossipKind};
|
use crate::types::{error, EnrBitfield, GossipKind};
|
||||||
use crate::EnrExt;
|
use crate::EnrExt;
|
||||||
use crate::{NetworkConfig, NetworkGlobals, PeerAction};
|
use crate::{NetworkConfig, NetworkGlobals, PeerAction};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
@ -15,6 +17,7 @@ use libp2p::{
|
|||||||
PeerId, Swarm, Transport,
|
PeerId, Swarm, Transport,
|
||||||
};
|
};
|
||||||
use slog::{crit, debug, info, o, trace, warn};
|
use slog::{crit, debug, info, o, trace, warn};
|
||||||
|
use ssz::Decode;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::prelude::*;
|
use std::io::prelude::*;
|
||||||
use std::io::{Error, ErrorKind};
|
use std::io::{Error, ErrorKind};
|
||||||
@ -26,6 +29,8 @@ use types::{EnrForkId, EthSpec};
|
|||||||
pub const NETWORK_KEY_FILENAME: &str = "key";
|
pub const NETWORK_KEY_FILENAME: &str = "key";
|
||||||
/// The maximum simultaneous libp2p connections per peer.
|
/// The maximum simultaneous libp2p connections per peer.
|
||||||
const MAX_CONNECTIONS_PER_PEER: usize = 1;
|
const MAX_CONNECTIONS_PER_PEER: usize = 1;
|
||||||
|
/// The filename to store our local metadata.
|
||||||
|
pub const METADATA_FILENAME: &str = "metadata";
|
||||||
|
|
||||||
/// The types of events than can be obtained from polling the libp2p service.
|
/// The types of events than can be obtained from polling the libp2p service.
|
||||||
///
|
///
|
||||||
@ -70,11 +75,15 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
|||||||
enr::build_or_load_enr::<TSpec>(local_keypair.clone(), config, enr_fork_id, &log)?;
|
enr::build_or_load_enr::<TSpec>(local_keypair.clone(), config, enr_fork_id, &log)?;
|
||||||
|
|
||||||
let local_peer_id = enr.peer_id();
|
let local_peer_id = enr.peer_id();
|
||||||
|
|
||||||
|
let meta_data = load_or_build_metadata(&config.network_dir, &log);
|
||||||
|
|
||||||
// set up a collection of variables accessible outside of the network crate
|
// set up a collection of variables accessible outside of the network crate
|
||||||
let network_globals = Arc::new(NetworkGlobals::new(
|
let network_globals = Arc::new(NetworkGlobals::new(
|
||||||
enr.clone(),
|
enr.clone(),
|
||||||
config.libp2p_port,
|
config.libp2p_port,
|
||||||
config.discovery_port,
|
config.discovery_port,
|
||||||
|
meta_data,
|
||||||
&log,
|
&log,
|
||||||
));
|
));
|
||||||
|
|
||||||
@ -420,3 +429,43 @@ fn strip_peer_id(addr: &mut Multiaddr) {
|
|||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Load metadata from persisted file. Return default metadata if loading fails.
|
||||||
|
fn load_or_build_metadata<E: EthSpec>(
|
||||||
|
network_dir: &std::path::PathBuf,
|
||||||
|
log: &slog::Logger,
|
||||||
|
) -> MetaData<E> {
|
||||||
|
// Default metadata
|
||||||
|
let mut meta_data = MetaData {
|
||||||
|
seq_number: 0,
|
||||||
|
attnets: EnrBitfield::<E>::default(),
|
||||||
|
};
|
||||||
|
// Read metadata from persisted file if available
|
||||||
|
let metadata_path = network_dir.join(METADATA_FILENAME);
|
||||||
|
if let Ok(mut metadata_file) = File::open(metadata_path) {
|
||||||
|
let mut metadata_ssz = Vec::new();
|
||||||
|
if metadata_file.read_to_end(&mut metadata_ssz).is_ok() {
|
||||||
|
match MetaData::<E>::from_ssz_bytes(&metadata_ssz) {
|
||||||
|
Ok(persisted_metadata) => {
|
||||||
|
meta_data.seq_number = persisted_metadata.seq_number;
|
||||||
|
// Increment seq number if persisted attnet is not default
|
||||||
|
if persisted_metadata.attnets != meta_data.attnets {
|
||||||
|
meta_data.seq_number += 1;
|
||||||
|
}
|
||||||
|
debug!(log, "Loaded metadata from disk");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!(
|
||||||
|
log,
|
||||||
|
"Metadata from file could not be decoded";
|
||||||
|
"error" => format!("{:?}", e),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!(log, "Metadata sequence number"; "seq_num" => meta_data.seq_number);
|
||||||
|
save_metadata_to_disk(network_dir, meta_data.clone(), &log);
|
||||||
|
meta_data
|
||||||
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
//! A collection of variables that are accessible outside of the network thread itself.
|
//! A collection of variables that are accessible outside of the network thread itself.
|
||||||
use crate::peer_manager::PeerDB;
|
use crate::peer_manager::PeerDB;
|
||||||
|
use crate::rpc::MetaData;
|
||||||
use crate::types::SyncState;
|
use crate::types::SyncState;
|
||||||
use crate::Client;
|
use crate::Client;
|
||||||
use crate::EnrExt;
|
use crate::EnrExt;
|
||||||
@ -22,6 +23,8 @@ pub struct NetworkGlobals<TSpec: EthSpec> {
|
|||||||
pub listen_port_udp: AtomicU16,
|
pub listen_port_udp: AtomicU16,
|
||||||
/// The collection of known peers.
|
/// The collection of known peers.
|
||||||
pub peers: RwLock<PeerDB<TSpec>>,
|
pub peers: RwLock<PeerDB<TSpec>>,
|
||||||
|
// The local meta data of our node.
|
||||||
|
pub local_metadata: RwLock<MetaData<TSpec>>,
|
||||||
/// The current gossipsub topic subscriptions.
|
/// The current gossipsub topic subscriptions.
|
||||||
pub gossipsub_subscriptions: RwLock<HashSet<GossipTopic>>,
|
pub gossipsub_subscriptions: RwLock<HashSet<GossipTopic>>,
|
||||||
/// The current sync status of the node.
|
/// The current sync status of the node.
|
||||||
@ -29,7 +32,13 @@ pub struct NetworkGlobals<TSpec: EthSpec> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
|
impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
|
||||||
pub fn new(enr: Enr, tcp_port: u16, udp_port: u16, log: &slog::Logger) -> Self {
|
pub fn new(
|
||||||
|
enr: Enr,
|
||||||
|
tcp_port: u16,
|
||||||
|
udp_port: u16,
|
||||||
|
local_metadata: MetaData<TSpec>,
|
||||||
|
log: &slog::Logger,
|
||||||
|
) -> Self {
|
||||||
NetworkGlobals {
|
NetworkGlobals {
|
||||||
local_enr: RwLock::new(enr.clone()),
|
local_enr: RwLock::new(enr.clone()),
|
||||||
peer_id: RwLock::new(enr.peer_id()),
|
peer_id: RwLock::new(enr.peer_id()),
|
||||||
@ -37,6 +46,7 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
|
|||||||
listen_port_tcp: AtomicU16::new(tcp_port),
|
listen_port_tcp: AtomicU16::new(tcp_port),
|
||||||
listen_port_udp: AtomicU16::new(udp_port),
|
listen_port_udp: AtomicU16::new(udp_port),
|
||||||
peers: RwLock::new(PeerDB::new(log)),
|
peers: RwLock::new(PeerDB::new(log)),
|
||||||
|
local_metadata: RwLock::new(local_metadata),
|
||||||
gossipsub_subscriptions: RwLock::new(HashSet::new()),
|
gossipsub_subscriptions: RwLock::new(HashSet::new()),
|
||||||
sync_state: RwLock::new(SyncState::Stalled),
|
sync_state: RwLock::new(SyncState::Stalled),
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,8 @@ mod tests {
|
|||||||
migrate::NullMigrator,
|
migrate::NullMigrator,
|
||||||
};
|
};
|
||||||
use eth2_libp2p::discovery::{build_enr, Keypair};
|
use eth2_libp2p::discovery::{build_enr, Keypair};
|
||||||
|
use eth2_libp2p::rpc::methods::MetaData;
|
||||||
|
use eth2_libp2p::types::EnrBitfield;
|
||||||
use eth2_libp2p::{
|
use eth2_libp2p::{
|
||||||
discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals, SubnetDiscovery,
|
discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals, SubnetDiscovery,
|
||||||
};
|
};
|
||||||
@ -102,7 +104,14 @@ mod tests {
|
|||||||
let enr_key = CombinedKey::from_libp2p(&Keypair::generate_secp256k1()).unwrap();
|
let enr_key = CombinedKey::from_libp2p(&Keypair::generate_secp256k1()).unwrap();
|
||||||
let enr = build_enr::<MinimalEthSpec>(&enr_key, &config, EnrForkId::default()).unwrap();
|
let enr = build_enr::<MinimalEthSpec>(&enr_key, &config, EnrForkId::default()).unwrap();
|
||||||
|
|
||||||
let network_globals: NetworkGlobals<MinimalEthSpec> = NetworkGlobals::new(enr, 0, 0, &log);
|
// Default metadata
|
||||||
|
let meta_data = MetaData {
|
||||||
|
seq_number: 0,
|
||||||
|
attnets: EnrBitfield::<MinimalEthSpec>::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let network_globals: NetworkGlobals<MinimalEthSpec> =
|
||||||
|
NetworkGlobals::new(enr, 0, 0, meta_data, &log);
|
||||||
AttestationService::new(beacon_chain, Arc::new(network_globals), &log)
|
AttestationService::new(beacon_chain, Arc::new(network_globals), &log)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user