diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 071565449..cb9c367fb 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,6 +1,7 @@ use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent}; use crate::rpc::*; -use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery}; +use crate::service::METADATA_FILENAME; +use crate::types::{GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery}; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; @@ -23,9 +24,9 @@ use libp2p::{ PeerId, }; use slog::{crit, debug, o, trace, warn}; -use ssz::{Decode, Encode}; +use ssz::Encode; use std::fs::File; -use std::io::{Read, Write}; +use std::io::Write; use std::path::PathBuf; use std::{ collections::VecDeque, @@ -38,7 +39,6 @@ use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId}; mod handler; const MAX_IDENTIFY_ADDRESSES: usize = 10; -const METADATA_FILENAME: &str = "metadata"; /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core @@ -58,8 +58,6 @@ pub struct Behaviour { events: VecDeque>, /// Queue of peers to disconnect and an optional reason for the disconnection. peers_to_dc: VecDeque<(PeerId, Option)>, - /// The current meta data of the node, so respond to pings and get metadata - meta_data: MetaData, /// A collections of variables accessible outside the network service. network_globals: Arc>, /// Keeps track of the current EnrForkId for upgrading gossipsub topics. @@ -95,8 +93,6 @@ impl Behaviour { .eth2() .expect("Local ENR must have a fork id"); - let meta_data = load_or_build_metadata(&net_conf.network_dir, &log); - let gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, net_conf.gs_config.clone()) .map_err(|e| format!("Could not construct gossipsub: {:?}", e))?; @@ -115,7 +111,6 @@ impl Behaviour { .await?, events: VecDeque::new(), peers_to_dc: VecDeque::new(), - meta_data, network_globals, enr_fork_id, waker: None, @@ -407,21 +402,31 @@ impl Behaviour { /// Updates the current meta data of the node to match the local ENR. fn update_metadata(&mut self) { - self.meta_data.seq_number += 1; - self.meta_data.attnets = self + let local_attnets = self .peer_manager .discovery() .local_enr() .bitfield::() .expect("Local discovery must have bitfield"); + + { + // write lock scope + let mut meta_data = self.network_globals.local_metadata.write(); + meta_data.seq_number += 1; + meta_data.attnets = local_attnets; + } // Save the updated metadata to disk - save_metadata_to_disk(&self.network_dir, self.meta_data.clone(), &self.log); + save_metadata_to_disk( + &self.network_dir, + self.network_globals.local_metadata.read().clone(), + &self.log, + ); } /// Sends a Ping request to the peer. fn ping(&mut self, id: RequestId, peer_id: PeerId) { let ping = crate::rpc::Ping { - data: self.meta_data.seq_number, + data: self.network_globals.local_metadata.read().seq_number, }; trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string()); @@ -432,7 +437,7 @@ impl Behaviour { /// Sends a Pong response to the peer. fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) { let ping = crate::rpc::Ping { - data: self.meta_data.seq_number, + data: self.network_globals.local_metadata.read().seq_number, }; trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => peer_id.to_string()); let event = RPCCodedResponse::Success(RPCResponse::Pong(ping)); @@ -448,7 +453,9 @@ impl Behaviour { /// Sends a METADATA response to a peer. fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) { - let event = RPCCodedResponse::Success(RPCResponse::MetaData(self.meta_data.clone())); + let event = RPCCodedResponse::Success(RPCResponse::MetaData( + self.network_globals.local_metadata.read().clone(), + )); self.eth2_rpc.send_response(peer_id, id, event); } @@ -1107,45 +1114,8 @@ pub enum BehaviourEvent { StatusPeer(PeerId), } -/// Load metadata from persisted file. Return default metadata if loading fails. -fn load_or_build_metadata(network_dir: &PathBuf, log: &slog::Logger) -> MetaData { - // Default metadata - let mut meta_data = MetaData { - seq_number: 0, - attnets: EnrBitfield::::default(), - }; - // Read metadata from persisted file if available - let metadata_path = network_dir.join(METADATA_FILENAME); - if let Ok(mut metadata_file) = File::open(metadata_path) { - let mut metadata_ssz = Vec::new(); - if metadata_file.read_to_end(&mut metadata_ssz).is_ok() { - match MetaData::::from_ssz_bytes(&metadata_ssz) { - Ok(persisted_metadata) => { - meta_data.seq_number = persisted_metadata.seq_number; - // Increment seq number if persisted attnet is not default - if persisted_metadata.attnets != meta_data.attnets { - meta_data.seq_number += 1; - } - debug!(log, "Loaded metadata from disk"); - } - Err(e) => { - debug!( - log, - "Metadata from file could not be decoded"; - "error" => format!("{:?}", e), - ); - } - } - } - }; - - debug!(log, "Metadata sequence number"; "seq_num" => meta_data.seq_number); - save_metadata_to_disk(network_dir, meta_data.clone(), &log); - meta_data -} - /// Persist metadata to disk -fn save_metadata_to_disk(dir: &PathBuf, metadata: MetaData, log: &slog::Logger) { +pub fn save_metadata_to_disk(dir: &PathBuf, metadata: MetaData, log: &slog::Logger) { let _ = std::fs::create_dir_all(&dir); match File::create(dir.join(METADATA_FILENAME)) .and_then(|mut f| f.write_all(&metadata.as_ssz_bytes())) diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 67421cc96..50952eff4 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -1,8 +1,10 @@ -use crate::behaviour::{Behaviour, BehaviourEvent, PeerRequestId, Request, Response}; +use crate::behaviour::{ + save_metadata_to_disk, Behaviour, BehaviourEvent, PeerRequestId, Request, Response, +}; use crate::discovery::enr; use crate::multiaddr::Protocol; -use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}; -use crate::types::{error, GossipKind}; +use crate::rpc::{GoodbyeReason, MetaData, RPCResponseErrorCode, RequestId}; +use crate::types::{error, EnrBitfield, GossipKind}; use crate::EnrExt; use crate::{NetworkConfig, NetworkGlobals, PeerAction}; use futures::prelude::*; @@ -15,6 +17,7 @@ use libp2p::{ PeerId, Swarm, Transport, }; use slog::{crit, debug, info, o, trace, warn}; +use ssz::Decode; use std::fs::File; use std::io::prelude::*; use std::io::{Error, ErrorKind}; @@ -26,6 +29,8 @@ use types::{EnrForkId, EthSpec}; pub const NETWORK_KEY_FILENAME: &str = "key"; /// The maximum simultaneous libp2p connections per peer. const MAX_CONNECTIONS_PER_PEER: usize = 1; +/// The filename to store our local metadata. +pub const METADATA_FILENAME: &str = "metadata"; /// The types of events than can be obtained from polling the libp2p service. /// @@ -70,11 +75,15 @@ impl Service { enr::build_or_load_enr::(local_keypair.clone(), config, enr_fork_id, &log)?; let local_peer_id = enr.peer_id(); + + let meta_data = load_or_build_metadata(&config.network_dir, &log); + // set up a collection of variables accessible outside of the network crate let network_globals = Arc::new(NetworkGlobals::new( enr.clone(), config.libp2p_port, config.discovery_port, + meta_data, &log, )); @@ -420,3 +429,43 @@ fn strip_peer_id(addr: &mut Multiaddr) { _ => {} } } + +/// Load metadata from persisted file. Return default metadata if loading fails. +fn load_or_build_metadata( + network_dir: &std::path::PathBuf, + log: &slog::Logger, +) -> MetaData { + // Default metadata + let mut meta_data = MetaData { + seq_number: 0, + attnets: EnrBitfield::::default(), + }; + // Read metadata from persisted file if available + let metadata_path = network_dir.join(METADATA_FILENAME); + if let Ok(mut metadata_file) = File::open(metadata_path) { + let mut metadata_ssz = Vec::new(); + if metadata_file.read_to_end(&mut metadata_ssz).is_ok() { + match MetaData::::from_ssz_bytes(&metadata_ssz) { + Ok(persisted_metadata) => { + meta_data.seq_number = persisted_metadata.seq_number; + // Increment seq number if persisted attnet is not default + if persisted_metadata.attnets != meta_data.attnets { + meta_data.seq_number += 1; + } + debug!(log, "Loaded metadata from disk"); + } + Err(e) => { + debug!( + log, + "Metadata from file could not be decoded"; + "error" => format!("{:?}", e), + ); + } + } + } + }; + + debug!(log, "Metadata sequence number"; "seq_num" => meta_data.seq_number); + save_metadata_to_disk(network_dir, meta_data.clone(), &log); + meta_data +} diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index a4dec670a..bfed2fd9b 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -1,5 +1,6 @@ //! A collection of variables that are accessible outside of the network thread itself. use crate::peer_manager::PeerDB; +use crate::rpc::MetaData; use crate::types::SyncState; use crate::Client; use crate::EnrExt; @@ -22,6 +23,8 @@ pub struct NetworkGlobals { pub listen_port_udp: AtomicU16, /// The collection of known peers. pub peers: RwLock>, + // The local meta data of our node. + pub local_metadata: RwLock>, /// The current gossipsub topic subscriptions. pub gossipsub_subscriptions: RwLock>, /// The current sync status of the node. @@ -29,7 +32,13 @@ pub struct NetworkGlobals { } impl NetworkGlobals { - pub fn new(enr: Enr, tcp_port: u16, udp_port: u16, log: &slog::Logger) -> Self { + pub fn new( + enr: Enr, + tcp_port: u16, + udp_port: u16, + local_metadata: MetaData, + log: &slog::Logger, + ) -> Self { NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), @@ -37,6 +46,7 @@ impl NetworkGlobals { listen_port_tcp: AtomicU16::new(tcp_port), listen_port_udp: AtomicU16::new(udp_port), peers: RwLock::new(PeerDB::new(log)), + local_metadata: RwLock::new(local_metadata), gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), } diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 5e1ef76a2..46c11fd58 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -8,6 +8,8 @@ mod tests { migrate::NullMigrator, }; use eth2_libp2p::discovery::{build_enr, Keypair}; + use eth2_libp2p::rpc::methods::MetaData; + use eth2_libp2p::types::EnrBitfield; use eth2_libp2p::{ discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals, SubnetDiscovery, }; @@ -102,7 +104,14 @@ mod tests { let enr_key = CombinedKey::from_libp2p(&Keypair::generate_secp256k1()).unwrap(); let enr = build_enr::(&enr_key, &config, EnrForkId::default()).unwrap(); - let network_globals: NetworkGlobals = NetworkGlobals::new(enr, 0, 0, &log); + // Default metadata + let meta_data = MetaData { + seq_number: 0, + attnets: EnrBitfield::::default(), + }; + + let network_globals: NetworkGlobals = + NetworkGlobals::new(enr, 0, 0, meta_data, &log); AttestationService::new(beacon_chain, Arc::new(network_globals), &log) }