From 58111cddb20bbd6776f0fa079cbcb07c4b7cac62 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 24 Mar 2020 21:45:53 +1100 Subject: [PATCH] Adds ENR "eth2" field and Fork logic to networking (#953) * Merge #913 * Correct release tests * Completed release test corrections * Initial work on upgrading discovery * Updates discovery to latest version * Update ENR initialisation logic * Remove debug statements * Shifts timing units to slots * Initial work * Add initial fork versioning and EnrForkId * Correct linking for EnrForkId * Adds eth2 field to local ENR * Initial work to eth2 field integration * Integrate eth2 field into discovery * temp commit * Add a timer to adjust fork versions during a hard fork for the ENR --- Cargo.lock | 55 +++--- beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 51 +++++ beacon_node/beacon_chain/src/builder.rs | 9 + beacon_node/client/src/builder.rs | 6 +- beacon_node/client/src/config.rs | 3 + beacon_node/eth2-libp2p/Cargo.toml | 3 +- beacon_node/eth2-libp2p/src/behaviour.rs | 181 +++++++++--------- beacon_node/eth2-libp2p/src/config.rs | 5 + .../eth2-libp2p/src/discovery/enr_helpers.rs | 121 ++++++++++++ .../src/{discovery.rs => discovery/mod.rs} | 156 +++++---------- beacon_node/eth2-libp2p/tests/noise.rs | 2 +- beacon_node/fork/src/lib.rs | 14 +- beacon_node/network/src/service.rs | 34 +++- beacon_node/src/cli.rs | 8 + beacon_node/src/config.rs | 13 ++ beacon_node/src/lib.rs | 4 +- 17 files changed, 431 insertions(+), 235 deletions(-) create mode 100644 beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs rename beacon_node/eth2-libp2p/src/{discovery.rs => discovery/mod.rs} (72%) diff --git a/Cargo.lock b/Cargo.lock index a5cb39f40..8b8a0b9db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,6 +234,7 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "eth2_ssz_types", + "fork", "futures", "genesis", "integer-sqrt", @@ -1089,7 +1090,7 @@ dependencies = [ [[package]] name = "enr" version = "0.1.0-alpha.3" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "base64 0.12.0", "bs58 0.3.0", @@ -2057,7 +2058,7 @@ dependencies = [ [[package]] name = "libp2p" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "futures", @@ -2096,7 +2097,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "asn1_der", "bs58 0.3.0", @@ -2131,7 +2132,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "quote 1.0.3", "syn 1.0.16", @@ -2140,7 +2141,7 @@ dependencies = [ [[package]] name = "libp2p-deflate" version = "0.5.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "flate2", "futures", @@ -2151,7 +2152,7 @@ dependencies = [ [[package]] name = "libp2p-discv5" version = "0.1.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "arrayvec 0.4.12", "bigint", @@ -2182,7 +2183,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "futures", "libp2p-core", @@ -2193,7 +2194,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bs58 0.3.0", "bytes", @@ -2211,7 +2212,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.1.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "base64 0.10.1", "bs58 0.2.5", @@ -2236,7 +2237,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "futures", @@ -2255,7 +2256,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "arrayvec 0.5.1", "bytes", @@ -2282,7 +2283,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "data-encoding", "dns-parser", @@ -2304,7 +2305,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "fnv", @@ -2320,7 +2321,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.11.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "curve25519-dalek 1.2.3", @@ -2340,7 +2341,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "futures", @@ -2357,7 +2358,7 @@ dependencies = [ [[package]] name = "libp2p-plaintext" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "futures", @@ -2372,7 +2373,7 @@ dependencies = [ [[package]] name = "libp2p-secio" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "aes-ctr", "bytes", @@ -2401,7 +2402,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.3.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "futures", "libp2p-core", @@ -2414,7 +2415,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "futures", @@ -2430,7 +2431,7 @@ dependencies = [ [[package]] name = "libp2p-uds" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "futures", "libp2p-core", @@ -2441,7 +2442,7 @@ dependencies = [ [[package]] name = "libp2p-wasm-ext" version = "0.6.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "futures", "js-sys", @@ -2455,7 +2456,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "futures", @@ -2473,7 +2474,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "futures", "libp2p-core", @@ -2773,7 +2774,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.6.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "futures", @@ -3006,7 +3007,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.6.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "arrayref", "bs58 0.3.0", @@ -3023,7 +3024,7 @@ dependencies = [ [[package]] name = "parity-multihash" version = "0.2.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "blake2", "bytes", @@ -3799,7 +3800,7 @@ dependencies = [ [[package]] name = "rw-stream-sink" version = "0.1.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6#0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" +source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" dependencies = [ "bytes", "futures", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 428d6b79b..6a0f566ce 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -12,6 +12,7 @@ write_ssz_files = [] # Writes debugging .ssz files to /tmp during block process eth2_config = { path = "../../eth2/utils/eth2_config" } merkle_proof = { path = "../../eth2/utils/merkle_proof" } store = { path = "../store" } +fork = { path = "../fork" } parking_lot = "0.9.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d36a08fae..06000b0b4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -14,6 +14,7 @@ use crate::snapshot_cache::SnapshotCache; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconSnapshot; +use ::fork::{next_fork_epoch, next_fork_version}; use operation_pool::{OperationPool, PersistedOperationPool}; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; @@ -160,6 +161,8 @@ pub struct BeaconChain { pub(crate) shuffling_cache: TimeoutRwLock, /// Caches a map of `validator_index -> validator_pubkey`. pub(crate) validator_pubkey_cache: TimeoutRwLock, + /// A list of any hard-coded forks that have been disabled. + pub disabled_forks: Vec, /// Logging to CLI, etc. pub(crate) log: Logger, } @@ -2044,6 +2047,54 @@ impl BeaconChain { Ok(dump) } + + /// Gets the current EnrForkId. + /// + /// v0.11 + pub fn enr_fork_id(&self) -> Result { + Ok(EnrForkId { + // TODO: To be implemented with v0.11 updates + fork_digest: [0, 0, 0, 0], + next_fork_version: next_fork_version(self.slot()?, &self.disabled_forks), + next_fork_epoch: next_fork_epoch::( + &self.spec, + self.slot()?, + &self.disabled_forks, + ), + }) + } + + /// Calculates the duration (in millis) to the next fork, if one exists. + /// + /// This is required by the network thread to instantiate timeouts to update networking + /// constants + pub fn duration_to_next_fork(&self) -> Result, Error> { + let current_slot = self.slot()?; + let next_fork_epoch = + next_fork_epoch::(&self.spec, current_slot, &self.disabled_forks); + if next_fork_epoch != self.spec.far_future_epoch { + // There is an upcoming fork + let current_epoch = self.slot()?.epoch(T::EthSpec::slots_per_epoch()); + let epochs_until_fork = next_fork_epoch + .saturating_sub(current_epoch) + .saturating_sub(1u64); + let millis_until_fork = T::EthSpec::slots_per_epoch() + * self.spec.milliseconds_per_slot + * epochs_until_fork.as_u64(); + Ok(Some(tokio::timer::Delay::new( + Instant::now() + + self + .slot_clock + .duration_to_next_epoch(T::EthSpec::slots_per_epoch()) + .unwrap_or_else(|| Duration::from_secs(0)) + + Duration::from_millis(millis_until_fork) + // add a short timeout to start within the new fork period + + Duration::from_millis(200), + ))) + } else { + Ok(None) + } + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 72a61f542..f99fcfdbc 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -87,6 +87,7 @@ pub struct BeaconChainBuilder { pubkey_cache_path: Option, validator_pubkey_cache: Option, spec: ChainSpec, + disabled_forks: Vec, log: Option, } @@ -121,6 +122,7 @@ where head_tracker: None, pubkey_cache_path: None, data_dir: None, + disabled_forks: Vec::new(), validator_pubkey_cache: None, spec: TEthSpec::default_spec(), log: None, @@ -167,6 +169,12 @@ where self } + /// Sets a list of hard-coded forks that will not be activated. + pub fn disabled_forks(mut self, disabled_forks: Vec) -> Self { + self.disabled_forks = disabled_forks; + self + } + /// Attempt to load an existing eth1 cache from the builder's `Store`. pub fn get_persisted_eth1_backend(&self) -> Result, String> { let store = self @@ -425,6 +433,7 @@ where )), shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), + disabled_forks: self.disabled_forks, log: log.clone(), }; diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 7e0f76519..ab7897ce8 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -125,6 +125,7 @@ where let runtime_context = self.runtime_context.clone(); let eth_spec_instance = self.eth_spec_instance.clone(); let data_dir = config.data_dir.clone(); + let disabled_forks = config.disabled_forks.clone(); future::ok(()) .and_then(move |()| { @@ -146,7 +147,8 @@ where .store(store) .store_migrator(store_migrator) .data_dir(data_dir) - .custom_spec(spec.clone()); + .custom_spec(spec.clone()) + .disabled_forks(disabled_forks); Ok((builder, spec, context)) }) @@ -250,7 +252,7 @@ where } /// Immediately starts the networking stack. - pub fn network(mut self, config: &NetworkConfig) -> Result { + pub fn network(mut self, config: &mut NetworkConfig) -> Result { let beacon_chain = self .beacon_chain .clone() diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 3335ea123..c4b3a0823 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -58,6 +58,8 @@ pub struct Config { /// This is the method used for the 2019 client interop in Canada. pub dummy_eth1_backend: bool, pub sync_eth1_chain: bool, + /// A list of hard-coded forks that will be disabled. + pub disabled_forks: Vec, #[serde(skip)] /// The `genesis` field is not serialized or deserialized by `serde` to ensure it is defined /// via the CLI at runtime, instead of from a configuration file saved to disk. @@ -86,6 +88,7 @@ impl Default for Config { dummy_eth1_backend: false, sync_eth1_chain: false, eth1: <_>::default(), + disabled_forks: Vec::new(), } } } diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 546a31fd8..dd979a01c 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" hex = "0.3" # rust-libp2p is presently being sourced from a Sigma Prime fork of the # `libp2p/rust-libp2p` repository. -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "0cb9d504c7be6a7bcfc87feeafdb6847d8083fc6" } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "8c272a9a4d115d9a1d33791479527cdcba781829" } types = { path = "../../eth2/types" } serde = "1.0.102" serde_derive = "1.0.102" @@ -16,6 +16,7 @@ eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" slog = { version = "2.5.2", features = ["max_level_trace"] } version = { path = "../version" } +# beacon_chain = { path = "../beacon_chain" } tokio = "0.1.22" futures = "0.1.29" error-chain = "0.12.1" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 5349772fa..4d637bcf1 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -15,7 +15,7 @@ use libp2p::{ use lru::LruCache; use slog::{debug, o, warn}; use std::sync::Arc; -use types::EthSpec; +use types::{EnrForkId, EthSpec}; const MAX_IDENTIFY_ADDRESSES: usize = 20; @@ -87,6 +87,96 @@ impl Behaviour Behaviour { + /* Pubsub behaviour functions */ + + /// Subscribes to a gossipsub topic. + pub fn subscribe(&mut self, topic: GossipTopic) -> bool { + if !self + .network_globals + .gossipsub_subscriptions + .read() + .contains(&topic) + { + self.network_globals + .gossipsub_subscriptions + .write() + .push(topic.clone()); + } + self.gossipsub.subscribe(topic.into()) + } + + /// Unsubscribe from a gossipsub topic. + pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool { + let pos = self + .network_globals + .gossipsub_subscriptions + .read() + .iter() + .position(|s| s == &topic); + if let Some(pos) = pos { + self.network_globals + .gossipsub_subscriptions + .write() + .swap_remove(pos); + } + self.gossipsub.unsubscribe(topic.into()) + } + + /// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding. + pub fn publish(&mut self, messages: Vec>) { + for message in messages { + for topic in message.topics() { + let message_data = message.encode(); + self.gossipsub.publish(&topic.into(), message_data); + } + } + } + + /// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated + /// once validated by the beacon chain. + pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: MessageId) { + self.gossipsub + .propagate_message(&message_id, propagation_source); + } + + /* Eth2 RPC behaviour functions */ + + /// Sends an RPC Request/Response via the RPC protocol. + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.eth2_rpc.send_rpc(peer_id, rpc_event); + } + + /* Discovery / Peer management functions */ + + /// Notify discovery that the peer has been banned. + pub fn peer_banned(&mut self, peer_id: PeerId) { + self.discovery.peer_banned(peer_id); + } + + /// Notify discovery that the peer has been unbanned. + pub fn peer_unbanned(&mut self, peer_id: &PeerId) { + self.discovery.peer_unbanned(peer_id); + } + + /// Returns an iterator over all enr entries in the DHT. + pub fn enr_entries(&mut self) -> impl Iterator { + self.discovery.enr_entries() + } + + /// Add an ENR to the routing table of the discovery mechanism. + pub fn add_enr(&mut self, enr: Enr) { + self.discovery.add_enr(enr); + } + + /// Updates the local ENR's "eth2" field with the latest EnrForkId. + pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) { + self.discovery.update_eth2_enr(enr_fork_id); + // TODO: Handle gossipsub fork update + } +} + // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour impl NetworkBehaviourEventProcess for Behaviour @@ -194,95 +284,6 @@ impl NetworkBehaviourEventPr } } -/// Implements the combined behaviour for the libp2p service. -impl Behaviour { - /* Pubsub behaviour functions */ - - /// Subscribes to a gossipsub topic. - pub fn subscribe(&mut self, topic: GossipTopic) -> bool { - if !self - .network_globals - .gossipsub_subscriptions - .read() - .contains(&topic) - { - self.network_globals - .gossipsub_subscriptions - .write() - .push(topic.clone()); - } - self.gossipsub.subscribe(topic.into()) - } - - /// Unsubscribe from a gossipsub topic. - pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool { - let pos = self - .network_globals - .gossipsub_subscriptions - .read() - .iter() - .position(|s| s == &topic); - if let Some(pos) = pos { - self.network_globals - .gossipsub_subscriptions - .write() - .swap_remove(pos); - } - self.gossipsub.unsubscribe(topic.into()) - } - - /// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding. - pub fn publish(&mut self, messages: Vec>) { - for message in messages { - for topic in message.topics() { - let message_data = message.encode(); - self.gossipsub.publish(&topic.into(), message_data); - } - } - } - - /// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated - /// once validated by the beacon chain. - pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: MessageId) { - self.gossipsub - .propagate_message(&message_id, propagation_source); - } - - /* Eth2 RPC behaviour functions */ - - /// Sends an RPC Request/Response via the RPC protocol. - pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.eth2_rpc.send_rpc(peer_id, rpc_event); - } - - /* Discovery / Peer management functions */ - - /// The current number of connected libp2p peers. - pub fn connected_peers(&self) -> usize { - self.network_globals.connected_peers() - } - - /// Notify discovery that the peer has been banned. - pub fn peer_banned(&mut self, peer_id: PeerId) { - self.discovery.peer_banned(peer_id); - } - - /// Notify discovery that the peer has been unbanned. - pub fn peer_unbanned(&mut self, peer_id: &PeerId) { - self.discovery.peer_unbanned(peer_id); - } - - /// Returns an iterator over all enr entries in the DHT. - pub fn enr_entries(&mut self) -> impl Iterator { - self.discovery.enr_entries() - } - - /// Add an ENR to the routing table of the discovery mechanism. - pub fn add_enr(&mut self, enr: Enr) { - self.discovery.add_enr(enr); - } -} - /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { /// A received RPC event and the peer that it was received from. diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 57c90cc9a..9de08f36e 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -7,6 +7,7 @@ use serde_derive::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::path::PathBuf; use std::time::Duration; +use types::EnrForkId; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -63,6 +64,9 @@ pub struct Config { /// List of extra topics to initially subscribe to as strings. pub topics: Vec, + /// The initial ENR fork id. + pub enr_fork_id: EnrForkId, + /// Introduces randomization in network propagation of messages. This should only be set for /// testing purposes and will likely be removed in future versions. // TODO: Remove this functionality for mainnet @@ -132,6 +136,7 @@ impl Default for Config { libp2p_nodes: vec![], client_version: version::version(), topics, + enr_fork_id: EnrForkId::default(), propagation_percentage: None, } } diff --git a/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs b/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs new file mode 100644 index 000000000..203f9ac84 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs @@ -0,0 +1,121 @@ +use super::ENR_FILENAME; +use crate::Enr; +use crate::NetworkConfig; +use libp2p::core::identity::Keypair; +use libp2p::discv5::enr::{CombinedKey, EnrBuilder}; +use slog::{debug, warn}; +use ssz::Encode; +use std::convert::TryInto; +use std::fs::File; +use std::io::prelude::*; +use std::path::Path; +use std::str::FromStr; + +/// Loads an ENR from file if it exists and matches the current NodeId and sequence number. If none +/// exists, generates a new one. +/// +/// If an ENR exists, with the same NodeId, this function checks to see if the loaded ENR from +/// disk is suitable to use, otherwise we increment our newly generated ENR's sequence number. +pub fn build_or_load_enr( + local_key: Keypair, + config: &NetworkConfig, + log: &slog::Logger, +) -> Result { + // Build the local ENR. + // Note: Discovery should update the ENR record's IP to the external IP as seen by the + // majority of our peers, if the CLI doesn't expressly forbid it. + let enr_key: CombinedKey = local_key + .try_into() + .map_err(|_| "Invalid key type for ENR records")?; + + let mut local_enr = build_enr(&enr_key, config)?; + + let enr_f = config.network_dir.join(ENR_FILENAME); + if let Ok(mut enr_file) = File::open(enr_f.clone()) { + let mut enr_string = String::new(); + match enr_file.read_to_string(&mut enr_string) { + Err(_) => debug!(log, "Could not read ENR from file"), + Ok(_) => { + match Enr::from_str(&enr_string) { + Ok(disk_enr) => { + // if the same node id, then we may need to update our sequence number + if local_enr.node_id() == disk_enr.node_id() { + if compare_enr(&local_enr, &disk_enr) { + debug!(log, "ENR loaded from disk"; "file" => format!("{:?}", enr_f)); + // the stored ENR has the same configuration, use it + return Ok(disk_enr); + } + + // same node id, different configuration - update the sequence number + let new_seq_no = disk_enr.seq().checked_add(1).ok_or_else(|| "ENR sequence number on file is too large. Remove it to generate a new NodeId")?; + local_enr.set_seq(new_seq_no, &enr_key).map_err(|e| { + format!("Could not update ENR sequence number: {:?}", e) + })?; + debug!(log, "ENR sequence number increased"; "seq" => new_seq_no); + } + } + Err(e) => { + warn!(log, "ENR from file could not be decoded"; "error" => format!("{:?}", e)); + } + } + } + } + } + + save_enr_to_disk(&config.network_dir, &local_enr, log); + + Ok(local_enr) +} + +/// Builds a lighthouse ENR given a `NetworkConfig`. +fn build_enr(enr_key: &CombinedKey, config: &NetworkConfig) -> Result { + let mut builder = EnrBuilder::new("v4"); + if let Some(enr_address) = config.enr_address { + builder.ip(enr_address); + } + if let Some(udp_port) = config.enr_udp_port { + builder.udp(udp_port); + } + // we always give it our listening tcp port + // TODO: Add uPnP support to map udp and tcp ports + let tcp_port = config.enr_tcp_port.unwrap_or_else(|| config.libp2p_port); + builder.tcp(tcp_port); + + // set the `eth2` field on our ENR + builder.add_value("eth2".into(), config.enr_fork_id.as_ssz_bytes()); + + builder + .tcp(config.libp2p_port) + .build(enr_key) + .map_err(|e| format!("Could not build Local ENR: {:?}", e)) +} + +/// Defines the conditions under which we use the locally built ENR or the one stored on disk. +/// If this function returns true, we use the `disk_enr`. +fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool { + // take preference over disk_enr address if one is not specified + (local_enr.ip().is_none() || local_enr.ip() == disk_enr.ip()) + // tcp ports must match + && local_enr.tcp() == disk_enr.tcp() + && local_enr.get("eth2") == disk_enr.get("eth2") + // take preference over disk udp port if one is not specified + && (local_enr.udp().is_none() || local_enr.udp() == disk_enr.udp()) +} + +/// Saves an ENR to disk +pub fn save_enr_to_disk(dir: &Path, enr: &Enr, log: &slog::Logger) { + let _ = std::fs::create_dir_all(dir); + match File::create(dir.join(Path::new(ENR_FILENAME))) + .and_then(|mut f| f.write_all(&enr.to_base64().as_bytes())) + { + Ok(_) => { + debug!(log, "ENR written to disk"); + } + Err(e) => { + warn!( + log, + "Could not write ENR to file"; "file" => format!("{:?}{:?}",dir, ENR_FILENAME), "error" => format!("{}", e) + ); + } + } +} diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs similarity index 72% rename from beacon_node/eth2-libp2p/src/discovery.rs rename to beacon_node/eth2-libp2p/src/discovery/mod.rs index 2023d7cb3..6b5593ffc 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -1,29 +1,25 @@ +///! This manages the discovery and management of peers. +mod enr_helpers; + use crate::metrics; use crate::Enr; use crate::{error, NetworkConfig, NetworkGlobals, PeerInfo}; -/// This manages the discovery and management of peers. -/// -/// Currently using discv5 for peer discovery. -/// use futures::prelude::*; use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId}; -use libp2p::discv5::enr::{CombinedKey, EnrBuilder, NodeId}; +use libp2p::discv5::enr::NodeId; use libp2p::discv5::{Discv5, Discv5Event}; use libp2p::multiaddr::Protocol; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use slog::{debug, info, warn}; +use ssz::Encode; use std::collections::HashSet; -use std::convert::TryInto; -use std::fs::File; -use std::io::prelude::*; use std::net::SocketAddr; use std::path::Path; -use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::timer::Delay; -use types::EthSpec; +use types::{EnrForkId, EthSpec}; /// Maximum seconds before searching for extra peers. const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120; @@ -76,7 +72,7 @@ impl Discovery { let log = log.clone(); // checks if current ENR matches that found on disk - let local_enr = load_enr(local_key.clone(), config, &log)?; + let local_enr = enr_helpers::build_or_load_enr(local_key.clone(), config, &log)?; *network_globals.local_enr.write() = Some(local_enr.clone()); @@ -105,7 +101,13 @@ impl Discovery { "node_id" => format!("{}", bootnode_enr.node_id()), "peer_id" => format!("{}", bootnode_enr.peer_id()) ); - discovery.add_enr(bootnode_enr); + let _ = discovery.add_enr(bootnode_enr).map_err(|e| { + warn!( + log, + "Could not add peer to the local routing table"; + "error" => format!("{}", e) + ) + }); } Ok(Self { @@ -135,7 +137,13 @@ impl Discovery { /// Add an ENR to the routing table of the discovery mechanism. pub fn add_enr(&mut self, enr: Enr) { - self.discovery.add_enr(enr); + let _ = self.discovery.add_enr(enr).map_err(|e| { + warn!( + self.log, + "Could not add peer to the local routing table"; + "error" => format!("{}", e) + ) + }); } /// The peer has been banned. Add this peer to the banned list to prevent any future @@ -154,6 +162,34 @@ impl Discovery { self.discovery.enr_entries() } + /// Updates the `eth2` field of our local ENR. + pub fn update_eth2_enr(&mut self, enr_fork_id: EnrForkId) { + // to avoid having a reference to the spec constant, for the logging we assume + // FAR_FUTURE_EPOCH is u64::max_value() + let next_fork_epoch_log = if enr_fork_id.next_fork_epoch == u64::max_value() { + String::from("No other fork") + } else { + format!("{:?}", enr_fork_id.next_fork_epoch) + }; + + info!(self.log, "Updating the ENR fork version"; + "fork_digest" => format!("{:?}", enr_fork_id.fork_digest), + "next_fork_version" => format!("{:?}", enr_fork_id.next_fork_version), + "next_fork_epoch" => next_fork_epoch_log, + ); + + let _ = self + .discovery + .enr_insert("eth2".into(), enr_fork_id.as_ssz_bytes()) + .map_err(|e| { + warn!( + self.log, + "Could not update eth2 ENR field"; + "error" => format!("{:?}", e) + ) + }); + } + /// Search for new peers using the underlying discovery mechanism. fn find_peers(&mut self) { // pick a random NodeId @@ -268,13 +304,14 @@ where let mut address = Multiaddr::from(socket.ip()); address.push(Protocol::Tcp(self.tcp_port)); let enr = self.discovery.local_enr(); - save_enr_to_disc(Path::new(&self.enr_dir), enr, &self.log); + enr_helpers::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log); return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address, }); } Discv5Event::FindNodeResult { closer_peers, .. } => { + // TODO: Modify once ENR predicate search is available debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len()); // update the time to the next query if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES { @@ -320,94 +357,3 @@ where Async::NotReady } } - -/// Loads an ENR from file if it exists and matches the current NodeId and sequence number. If none -/// exists, generates a new one. -/// -/// If an ENR exists, with the same NodeId and IP address, we use the disk-generated one as its -/// ENR sequence will be equal or higher than a newly generated one. -fn load_enr(local_key: Keypair, config: &NetworkConfig, log: &slog::Logger) -> Result { - // Build the local ENR. - // Note: Discovery should update the ENR record's IP to the external IP as seen by the - // majority of our peers, if the CLI doesn't expressly forbid it. - let enr_key: CombinedKey = local_key - .try_into() - .map_err(|_| "Invalid key type for ENR records")?; - - let mut local_enr = { - let mut builder = EnrBuilder::new("v4"); - if let Some(enr_address) = config.enr_address { - builder.ip(enr_address); - } - if let Some(udp_port) = config.enr_udp_port { - builder.udp(udp_port); - } - // we always give it our listening tcp port - // TODO: Add uPnP support to map udp and tcp ports - let tcp_port = config.enr_tcp_port.unwrap_or_else(|| config.libp2p_port); - builder.tcp(tcp_port); - - builder - .tcp(config.libp2p_port) - .build(&enr_key) - .map_err(|e| format!("Could not build Local ENR: {:?}", e))? - }; - - let enr_f = config.network_dir.join(ENR_FILENAME); - if let Ok(mut enr_file) = File::open(enr_f.clone()) { - let mut enr_string = String::new(); - match enr_file.read_to_string(&mut enr_string) { - Err(_) => debug!(log, "Could not read ENR from file"), - Ok(_) => { - match Enr::from_str(&enr_string) { - Ok(enr) => { - let tcp_port = config.enr_tcp_port.unwrap_or_else(|| config.libp2p_port); - if enr.node_id() == local_enr.node_id() { - if (config.enr_address.is_none() - || enr.ip().map(Into::into) == config.enr_address) - && enr.tcp() == Some(tcp_port) - && (config.enr_udp_port.is_none() - || enr.udp() == config.enr_udp_port) - { - debug!(log, "ENR loaded from file"; "file" => format!("{:?}", enr_f)); - // the stored ENR has the same configuration, use it - return Ok(enr); - } - - // same node id, different configuration - update the sequence number - let new_seq_no = enr.seq().checked_add(1).ok_or_else(|| "ENR sequence number on file is too large. Remove it to generate a new NodeId")?; - local_enr.set_seq(new_seq_no, &enr_key).map_err(|e| { - format!("Could not update ENR sequence number: {:?}", e) - })?; - debug!(log, "ENR sequence number increased"; "seq" => new_seq_no); - } - } - Err(e) => { - warn!(log, "ENR from file could not be decoded"; "error" => format!("{:?}", e)); - } - } - } - } - } - - save_enr_to_disc(&config.network_dir, &local_enr, log); - - Ok(local_enr) -} - -fn save_enr_to_disc(dir: &Path, enr: &Enr, log: &slog::Logger) { - let _ = std::fs::create_dir_all(dir); - match File::create(dir.join(Path::new(ENR_FILENAME))) - .and_then(|mut f| f.write_all(&enr.to_base64().as_bytes())) - { - Ok(_) => { - debug!(log, "ENR written to disk"); - } - Err(e) => { - warn!( - log, - "Could not write ENR to file"; "file" => format!("{:?}{:?}",dir, ENR_FILENAME), "error" => format!("{}", e) - ); - } - } -} diff --git a/beacon_node/eth2-libp2p/tests/noise.rs b/beacon_node/eth2-libp2p/tests/noise.rs index 4e02648ec..52e05438e 100644 --- a/beacon_node/eth2-libp2p/tests/noise.rs +++ b/beacon_node/eth2-libp2p/tests/noise.rs @@ -117,7 +117,7 @@ fn build_secio_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMux fn test_secio_noise_fallback() { // set up the logging. The level and enabled logging or not let log_level = Level::Trace; - let enable_logging = true; + let enable_logging = false; let log = common::build_log(log_level, enable_logging); diff --git a/beacon_node/fork/src/lib.rs b/beacon_node/fork/src/lib.rs index d4f0c7133..a85f66055 100644 --- a/beacon_node/fork/src/lib.rs +++ b/beacon_node/fork/src/lib.rs @@ -1,5 +1,5 @@ ///! Maintains a hard-coded list of known forks and their slots at which they were activated. -use types::{Epoch, EthSpec, Slot, FAR_FUTURE_EPOCH}; +use types::{ChainSpec, Epoch, EthSpec, Slot}; mod forks; @@ -7,7 +7,7 @@ mod forks; /// number. /// /// The disabled_forks parameter select which forks are disabled by their name. -pub fn current_fork_version(slot: Slot, disabled_forks: Vec) -> [u8; 4] { +pub fn current_fork_version(slot: Slot, disabled_forks: &[String]) -> [u8; 4] { let mut version = [0, 0, 0, 0]; for (fork_name, fork_slot_no, fork_version) in forks::KNOWN_FORKS.iter() { if *fork_slot_no <= slot.as_u64() { @@ -25,7 +25,7 @@ pub fn current_fork_version(slot: Slot, disabled_forks: Vec) -> [u8; 4] version } -pub fn next_fork_version(slot: Slot, disabled_forks: Vec) -> [u8; 4] { +pub fn next_fork_version(slot: Slot, disabled_forks: &[String]) -> [u8; 4] { let mut version = None; for (fork_name, fork_slot_no, fork_version) in forks::KNOWN_FORKS.iter() { if *fork_slot_no > slot.as_u64() { @@ -48,7 +48,11 @@ pub fn next_fork_version(slot: Slot, disabled_forks: Vec) -> [u8; 4] { } } -pub fn next_fork_epoch(slot: Slot, disabled_forks: Vec) -> Epoch { +pub fn next_fork_epoch( + spec: &ChainSpec, + slot: Slot, + disabled_forks: &[String], +) -> Epoch { let mut next_fork_slot = None; for (fork_name, fork_slot_no, _fork_version) in forks::KNOWN_FORKS.iter() { if *fork_slot_no > slot.as_u64() { @@ -66,6 +70,6 @@ pub fn next_fork_epoch(slot: Slot, disabled_forks: Vec) -> E if let Some(fork_slot) = next_fork_slot { fork_slot.epoch(T::slots_per_epoch()) } else { - FAR_FUTURE_EPOCH + Epoch::from(spec.far_future_epoch) } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index be1035f6b..319d8de53 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -27,6 +27,8 @@ const BAN_PEER_TIMEOUT: u64 = 30; /// Service that handles communication between internal services and the `eth2_libp2p` network service. pub struct NetworkService { + /// A reference to the underlying beacon chain. + beacon_chain: Arc>, /// The underlying libp2p service that drives all the network interactions. libp2p: LibP2PService, /// An attestation and subnet manager service. @@ -42,6 +44,8 @@ pub struct NetworkService { network_globals: Arc>, /// An initial delay to update variables after the libp2p service has started. initial_delay: Delay, + /// A delay that expires when a new fork takes place. + next_fork_update: Option, /// The logger for the network service. log: slog::Logger, /// A probability of propagation. @@ -51,7 +55,7 @@ pub struct NetworkService { impl NetworkService { pub fn start( beacon_chain: Arc>, - config: &NetworkConfig, + config: &mut NetworkConfig, executor: &TaskExecutor, network_log: slog::Logger, ) -> error::Result<( @@ -72,6 +76,17 @@ impl NetworkService { )?; let propagation_percentage = config.propagation_percentage; + + // set the local enr_fork_id + config.enr_fork_id = beacon_chain + .enr_fork_id() + .map_err(|e| format!("Could not get the current ENR fork version: {:?}", e))?; + + // keep track of when our fork_id needs to be updated + let next_fork_update = beacon_chain + .duration_to_next_fork() + .map_err(|e| format!("Could not get the next fork update duration: {:?}", e))?; + // launch libp2p service let (network_globals, mut libp2p) = LibP2PService::new(config, network_log.clone())?; @@ -85,10 +100,11 @@ impl NetworkService { // create the attestation service let attestation_service = - AttestationService::new(beacon_chain, network_globals.clone(), &network_log); + AttestationService::new(beacon_chain.clone(), network_globals.clone(), &network_log); // create the network service and spawn the task let network_service = NetworkService { + beacon_chain, libp2p, attestation_service, network_recv, @@ -96,6 +112,7 @@ impl NetworkService { store, network_globals: network_globals.clone(), initial_delay, + next_fork_update, log: network_log, propagation_percentage, }; @@ -118,6 +135,7 @@ fn spawn_service( let log = &service.log; + // handles any logic which requires an initial delay if !service.initial_delay.is_elapsed() { if let Ok(Async::Ready(_)) = service.initial_delay.poll() { let multi_addrs = Swarm::listeners(&service.libp2p.swarm).cloned().collect(); @@ -306,6 +324,18 @@ fn spawn_service( ); } + // if we have just forked, update inform the libp2p layer + if let Some(mut update_fork_delay) = service.next_fork_update.take() { + if !update_fork_delay.is_elapsed() { + if let Ok(Async::Ready(_)) = update_fork_delay.poll() { + if let Ok(enr_fork_id) = service.beacon_chain.enr_fork_id() { + service.libp2p.swarm.update_fork_version(enr_fork_id); + } + service.next_fork_update = service.beacon_chain.duration_to_next_fork().unwrap_or_else(|_| None); + } + } + } + Ok(Async::NotReady) }) diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index a38d00f1a..94c92bbdd 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -150,6 +150,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { automatically.") .takes_value(true), ) + /* Client/chain related arguments */ + .arg( + Arg::with_name("disabled-forks") + .long("disabled-forks") + .value_name("STRING") + .help("A comma separated list of forks that will be disabled.") + .takes_value(true), + ) /* REST API related arguments */ .arg( Arg::with_name("http") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 3fa8339f2..4ff6be5fc 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -187,6 +187,19 @@ pub fn get_configs( client_config.network.secret_key_hex = Some(p2p_priv_key.to_string()); } + /* + * Chain specification + */ + if let Some(disabled_forks_str) = cli_args.value_of("disabled-forks") { + client_config.disabled_forks = disabled_forks_str + .split(',') + .map(|fork_name| { + fork_name + .parse() + .map_err(|_| format!("Invalid fork name: {}", fork_name)) + }) + .collect::>>()?; + } /* * Http server */ diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index e319f85f2..4774ee896 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -72,7 +72,7 @@ impl ProductionBeaconNode { /// Client behaviour is defined by the given `client_config`. pub fn new( context: RuntimeContext, - client_config: ClientConfig, + mut client_config: ClientConfig, ) -> impl Future { let http_eth2_config = context.eth2_config().clone(); let spec = context.eth2_config().spec.clone(); @@ -124,7 +124,7 @@ impl ProductionBeaconNode { .system_time_slot_clock()? .websocket_event_handler(client_config.websocket_server.clone())? .build_beacon_chain()? - .network(&client_config.network)? + .network(&mut client_config.network)? .notifier()?; let builder = if client_config.rest_api.enabled {