diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 6fff6278c..e70cda697 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -26,6 +26,7 @@ use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettin use libp2p::bandwidth::BandwidthSinks; use libp2p::gossipsub::{ self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError, + TopicScoreParams, }; use libp2p::identify; use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol}; @@ -618,6 +619,38 @@ impl Network { } } + /// Remove topic weight from all topics that don't have the given fork digest. + pub fn remove_topic_weight_except(&mut self, except: [u8; 4]) { + let new_param = TopicScoreParams { + topic_weight: 0.0, + ..Default::default() + }; + let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); + for topic in subscriptions + .iter() + .filter(|topic| topic.fork_digest != except) + { + let libp2p_topic: Topic = topic.clone().into(); + match self + .gossipsub_mut() + .set_topic_params(libp2p_topic, new_param.clone()) + { + Ok(_) => debug!(self.log, "Removed topic weight"; "topic" => %topic), + Err(e) => { + warn!(self.log, "Failed to remove topic weight"; "topic" => %topic, "error" => e) + } + } + } + } + + /// Returns the scoring parameters for a topic if set. + pub fn get_topic_params(&self, topic: GossipTopic) -> Option<&TopicScoreParams> { + self.swarm + .behaviour() + .gossipsub + .get_topic_params(&topic.into()) + } + /// Subscribes to a gossipsub topic. /// /// Returns `true` if the subscription was successful and `false` otherwise. diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 174a0ec14..aa92e0afd 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -215,15 +215,18 @@ pub struct NetworkService { } impl NetworkService { - #[allow(clippy::type_complexity)] - pub async fn start( + async fn build( beacon_chain: Arc>, config: &NetworkConfig, executor: task_executor::TaskExecutor, gossipsub_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, beacon_processor_reprocess_tx: mpsc::Sender, - ) -> error::Result<(Arc>, NetworkSenders)> { + ) -> error::Result<( + NetworkService, + Arc>, + NetworkSenders, + )> { let network_log = executor.log().clone(); // build the channels for external comms let (network_senders, network_recievers) = NetworkSenders::new(); @@ -369,6 +372,28 @@ impl NetworkService { enable_light_client_server: config.enable_light_client_server, }; + Ok((network_service, network_globals, network_senders)) + } + + #[allow(clippy::type_complexity)] + pub async fn start( + beacon_chain: Arc>, + config: &NetworkConfig, + executor: task_executor::TaskExecutor, + gossipsub_registry: Option<&'_ mut Registry>, + beacon_processor_send: BeaconProcessorSend, + beacon_processor_reprocess_tx: mpsc::Sender, + ) -> error::Result<(Arc>, NetworkSenders)> { + let (network_service, network_globals, network_senders) = Self::build( + beacon_chain, + config, + executor.clone(), + gossipsub_registry, + beacon_processor_send, + beacon_processor_reprocess_tx, + ) + .await?; + network_service.spawn_service(executor); Ok((network_globals, network_senders)) @@ -882,9 +907,10 @@ impl NetworkService { fn update_next_fork(&mut self) { let new_enr_fork_id = self.beacon_chain.enr_fork_id(); + let new_fork_digest = new_enr_fork_id.fork_digest; let fork_context = &self.fork_context; - if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) { + if let Some(new_fork_name) = fork_context.from_context_bytes(new_fork_digest) { info!( self.log, "Transitioned to new fork"; @@ -907,6 +933,10 @@ impl NetworkService { Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into()); self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into()); info!(self.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS); + + // Remove topic weight from old fork topics to prevent peers that left on the mesh on + // old topics from being penalized for not sending us messages. + self.libp2p.remove_topic_weight_except(new_fork_digest); } else { crit!(self.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id); } diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 23bcf456d..35a7f1eab 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -4,14 +4,26 @@ mod tests { use crate::persisted_dht::load_dht; use crate::{NetworkConfig, NetworkService}; use beacon_chain::test_utils::BeaconChainHarness; - use beacon_processor::BeaconProcessorChannels; - use lighthouse_network::Enr; + use beacon_chain::BeaconChainTypes; + use beacon_processor::{BeaconProcessorChannels, BeaconProcessorConfig}; + use futures::StreamExt; + use lighthouse_network::types::{GossipEncoding, GossipKind}; + use lighthouse_network::{Enr, GossipTopic}; use slog::{o, Drain, Level, Logger}; use sloggers::{null::NullLoggerBuilder, Build}; use std::str::FromStr; use std::sync::Arc; use tokio::runtime::Runtime; - use types::MinimalEthSpec; + use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId}; + + impl NetworkService { + fn get_topic_params( + &self, + topic: GossipTopic, + ) -> Option<&lighthouse_network::libp2p::gossipsub::TopicScoreParams> { + self.libp2p.get_topic_params(topic) + } + } fn get_logger(actual_log: bool) -> Logger { if actual_log { @@ -102,4 +114,126 @@ mod tests { "should have persisted the second ENR to store" ); } + + // Test removing topic weight on old topics when a fork happens. + #[test] + fn test_removing_topic_weight_on_old_topics() { + let runtime = Arc::new(Runtime::new().unwrap()); + + // Capella spec + let mut spec = MinimalEthSpec::default_spec(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + spec.capella_fork_epoch = Some(Epoch::new(1)); + + // Build beacon chain. + let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec) + .spec(spec.clone()) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build() + .chain; + let (next_fork_name, _) = beacon_chain.duration_to_next_fork().expect("next fork"); + assert_eq!(next_fork_name, ForkName::Capella); + + // Build network service. + let (mut network_service, network_globals, _network_senders) = runtime.block_on(async { + let (_, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let executor = task_executor::TaskExecutor::new( + Arc::downgrade(&runtime), + exit, + get_logger(false), + shutdown_tx, + ); + + let mut config = NetworkConfig::default(); + config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21214, 21214, 21215); + config.discv5_config.table_filter = |_| true; // Do not ignore local IPs + config.upnp_enabled = false; + + let beacon_processor_channels = + BeaconProcessorChannels::new(&BeaconProcessorConfig::default()); + NetworkService::build( + beacon_chain.clone(), + &config, + executor.clone(), + None, + beacon_processor_channels.beacon_processor_tx, + beacon_processor_channels.work_reprocessing_tx, + ) + .await + .unwrap() + }); + + // Subscribe to the topics. + runtime.block_on(async { + while network_globals.gossipsub_subscriptions.read().len() < 2 { + if let Some(msg) = network_service.attestation_service.next().await { + network_service.on_attestation_service_msg(msg); + } + } + }); + + // Make sure the service is subscribed to the topics. + let (old_topic1, old_topic2) = { + let mut subnets = SubnetId::compute_subnets_for_epoch::( + network_globals.local_enr().node_id().raw().into(), + beacon_chain.epoch().unwrap(), + &spec, + ) + .unwrap() + .0 + .collect::>(); + assert_eq!(2, subnets.len()); + + let old_fork_digest = beacon_chain.enr_fork_id().fork_digest; + let old_topic1 = GossipTopic::new( + GossipKind::Attestation(subnets.pop().unwrap()), + GossipEncoding::SSZSnappy, + old_fork_digest, + ); + let old_topic2 = GossipTopic::new( + GossipKind::Attestation(subnets.pop().unwrap()), + GossipEncoding::SSZSnappy, + old_fork_digest, + ); + + (old_topic1, old_topic2) + }; + let subscriptions = network_globals.gossipsub_subscriptions.read().clone(); + assert_eq!(2, subscriptions.len()); + assert!(subscriptions.contains(&old_topic1)); + assert!(subscriptions.contains(&old_topic2)); + let old_topic_params1 = network_service + .get_topic_params(old_topic1.clone()) + .expect("topic score params"); + assert!(old_topic_params1.topic_weight > 0.0); + let old_topic_params2 = network_service + .get_topic_params(old_topic2.clone()) + .expect("topic score params"); + assert!(old_topic_params2.topic_weight > 0.0); + + // Advance slot to the next fork + for _ in 0..MinimalEthSpec::slots_per_epoch() { + beacon_chain.slot_clock.advance_slot(); + } + + // Run `NetworkService::update_next_fork()`. + runtime.block_on(async { + network_service.update_next_fork(); + }); + + // Check that topic_weight on the old topics has been zeroed. + let old_topic_params1 = network_service + .get_topic_params(old_topic1) + .expect("topic score params"); + assert_eq!(0.0, old_topic_params1.topic_weight); + + let old_topic_params2 = network_service + .get_topic_params(old_topic2) + .expect("topic score params"); + assert_eq!(0.0, old_topic_params2.topic_weight); + } }