Remove deficit gossipsub scoring during topic transition (#4486)
## Issue Addressed This PR closes https://github.com/sigp/lighthouse/issues/3237 ## Proposed Changes Remove topic weight of old topics when the fork happens. ## Additional Info - Divided `NetworkService::start()` into `NetworkService::build()` and `NetworkService::start()` for ease of testing.
This commit is contained in:
parent
6ec649a4e2
commit
ba8bcf4bd3
@ -26,6 +26,7 @@ use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettin
|
|||||||
use libp2p::bandwidth::BandwidthSinks;
|
use libp2p::bandwidth::BandwidthSinks;
|
||||||
use libp2p::gossipsub::{
|
use libp2p::gossipsub::{
|
||||||
self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
|
self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
|
||||||
|
TopicScoreParams,
|
||||||
};
|
};
|
||||||
use libp2p::identify;
|
use libp2p::identify;
|
||||||
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol};
|
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol};
|
||||||
@ -618,6 +619,38 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remove topic weight from all topics that don't have the given fork digest.
|
||||||
|
pub fn remove_topic_weight_except(&mut self, except: [u8; 4]) {
|
||||||
|
let new_param = TopicScoreParams {
|
||||||
|
topic_weight: 0.0,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
|
||||||
|
for topic in subscriptions
|
||||||
|
.iter()
|
||||||
|
.filter(|topic| topic.fork_digest != except)
|
||||||
|
{
|
||||||
|
let libp2p_topic: Topic = topic.clone().into();
|
||||||
|
match self
|
||||||
|
.gossipsub_mut()
|
||||||
|
.set_topic_params(libp2p_topic, new_param.clone())
|
||||||
|
{
|
||||||
|
Ok(_) => debug!(self.log, "Removed topic weight"; "topic" => %topic),
|
||||||
|
Err(e) => {
|
||||||
|
warn!(self.log, "Failed to remove topic weight"; "topic" => %topic, "error" => e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the scoring parameters for a topic if set.
|
||||||
|
pub fn get_topic_params(&self, topic: GossipTopic) -> Option<&TopicScoreParams> {
|
||||||
|
self.swarm
|
||||||
|
.behaviour()
|
||||||
|
.gossipsub
|
||||||
|
.get_topic_params(&topic.into())
|
||||||
|
}
|
||||||
|
|
||||||
/// Subscribes to a gossipsub topic.
|
/// Subscribes to a gossipsub topic.
|
||||||
///
|
///
|
||||||
/// Returns `true` if the subscription was successful and `false` otherwise.
|
/// Returns `true` if the subscription was successful and `false` otherwise.
|
||||||
|
@ -215,15 +215,18 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> NetworkService<T> {
|
impl<T: BeaconChainTypes> NetworkService<T> {
|
||||||
#[allow(clippy::type_complexity)]
|
async fn build(
|
||||||
pub async fn start(
|
|
||||||
beacon_chain: Arc<BeaconChain<T>>,
|
beacon_chain: Arc<BeaconChain<T>>,
|
||||||
config: &NetworkConfig,
|
config: &NetworkConfig,
|
||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
gossipsub_registry: Option<&'_ mut Registry>,
|
gossipsub_registry: Option<&'_ mut Registry>,
|
||||||
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
|
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
|
||||||
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
|
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
|
||||||
) -> error::Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>)> {
|
) -> error::Result<(
|
||||||
|
NetworkService<T>,
|
||||||
|
Arc<NetworkGlobals<T::EthSpec>>,
|
||||||
|
NetworkSenders<T::EthSpec>,
|
||||||
|
)> {
|
||||||
let network_log = executor.log().clone();
|
let network_log = executor.log().clone();
|
||||||
// build the channels for external comms
|
// build the channels for external comms
|
||||||
let (network_senders, network_recievers) = NetworkSenders::new();
|
let (network_senders, network_recievers) = NetworkSenders::new();
|
||||||
@ -369,6 +372,28 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
enable_light_client_server: config.enable_light_client_server,
|
enable_light_client_server: config.enable_light_client_server,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Ok((network_service, network_globals, network_senders))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
pub async fn start(
|
||||||
|
beacon_chain: Arc<BeaconChain<T>>,
|
||||||
|
config: &NetworkConfig,
|
||||||
|
executor: task_executor::TaskExecutor,
|
||||||
|
gossipsub_registry: Option<&'_ mut Registry>,
|
||||||
|
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
|
||||||
|
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
|
||||||
|
) -> error::Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>)> {
|
||||||
|
let (network_service, network_globals, network_senders) = Self::build(
|
||||||
|
beacon_chain,
|
||||||
|
config,
|
||||||
|
executor.clone(),
|
||||||
|
gossipsub_registry,
|
||||||
|
beacon_processor_send,
|
||||||
|
beacon_processor_reprocess_tx,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
network_service.spawn_service(executor);
|
network_service.spawn_service(executor);
|
||||||
|
|
||||||
Ok((network_globals, network_senders))
|
Ok((network_globals, network_senders))
|
||||||
@ -882,9 +907,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
|
|
||||||
fn update_next_fork(&mut self) {
|
fn update_next_fork(&mut self) {
|
||||||
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
|
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
|
||||||
|
let new_fork_digest = new_enr_fork_id.fork_digest;
|
||||||
|
|
||||||
let fork_context = &self.fork_context;
|
let fork_context = &self.fork_context;
|
||||||
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) {
|
if let Some(new_fork_name) = fork_context.from_context_bytes(new_fork_digest) {
|
||||||
info!(
|
info!(
|
||||||
self.log,
|
self.log,
|
||||||
"Transitioned to new fork";
|
"Transitioned to new fork";
|
||||||
@ -907,6 +933,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into());
|
Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into());
|
||||||
self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
|
self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
|
||||||
info!(self.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS);
|
info!(self.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS);
|
||||||
|
|
||||||
|
// Remove topic weight from old fork topics to prevent peers that left on the mesh on
|
||||||
|
// old topics from being penalized for not sending us messages.
|
||||||
|
self.libp2p.remove_topic_weight_except(new_fork_digest);
|
||||||
} else {
|
} else {
|
||||||
crit!(self.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id);
|
crit!(self.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id);
|
||||||
}
|
}
|
||||||
|
@ -4,14 +4,26 @@ mod tests {
|
|||||||
use crate::persisted_dht::load_dht;
|
use crate::persisted_dht::load_dht;
|
||||||
use crate::{NetworkConfig, NetworkService};
|
use crate::{NetworkConfig, NetworkService};
|
||||||
use beacon_chain::test_utils::BeaconChainHarness;
|
use beacon_chain::test_utils::BeaconChainHarness;
|
||||||
use beacon_processor::BeaconProcessorChannels;
|
use beacon_chain::BeaconChainTypes;
|
||||||
use lighthouse_network::Enr;
|
use beacon_processor::{BeaconProcessorChannels, BeaconProcessorConfig};
|
||||||
|
use futures::StreamExt;
|
||||||
|
use lighthouse_network::types::{GossipEncoding, GossipKind};
|
||||||
|
use lighthouse_network::{Enr, GossipTopic};
|
||||||
use slog::{o, Drain, Level, Logger};
|
use slog::{o, Drain, Level, Logger};
|
||||||
use sloggers::{null::NullLoggerBuilder, Build};
|
use sloggers::{null::NullLoggerBuilder, Build};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
use types::MinimalEthSpec;
|
use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId};
|
||||||
|
|
||||||
|
impl<T: BeaconChainTypes> NetworkService<T> {
|
||||||
|
fn get_topic_params(
|
||||||
|
&self,
|
||||||
|
topic: GossipTopic,
|
||||||
|
) -> Option<&lighthouse_network::libp2p::gossipsub::TopicScoreParams> {
|
||||||
|
self.libp2p.get_topic_params(topic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn get_logger(actual_log: bool) -> Logger {
|
fn get_logger(actual_log: bool) -> Logger {
|
||||||
if actual_log {
|
if actual_log {
|
||||||
@ -102,4 +114,126 @@ mod tests {
|
|||||||
"should have persisted the second ENR to store"
|
"should have persisted the second ENR to store"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test removing topic weight on old topics when a fork happens.
|
||||||
|
#[test]
|
||||||
|
fn test_removing_topic_weight_on_old_topics() {
|
||||||
|
let runtime = Arc::new(Runtime::new().unwrap());
|
||||||
|
|
||||||
|
// Capella spec
|
||||||
|
let mut spec = MinimalEthSpec::default_spec();
|
||||||
|
spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||||
|
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||||
|
spec.capella_fork_epoch = Some(Epoch::new(1));
|
||||||
|
|
||||||
|
// Build beacon chain.
|
||||||
|
let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec)
|
||||||
|
.spec(spec.clone())
|
||||||
|
.deterministic_keypairs(8)
|
||||||
|
.fresh_ephemeral_store()
|
||||||
|
.mock_execution_layer()
|
||||||
|
.build()
|
||||||
|
.chain;
|
||||||
|
let (next_fork_name, _) = beacon_chain.duration_to_next_fork().expect("next fork");
|
||||||
|
assert_eq!(next_fork_name, ForkName::Capella);
|
||||||
|
|
||||||
|
// Build network service.
|
||||||
|
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
|
||||||
|
let (_, exit) = exit_future::signal();
|
||||||
|
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||||
|
let executor = task_executor::TaskExecutor::new(
|
||||||
|
Arc::downgrade(&runtime),
|
||||||
|
exit,
|
||||||
|
get_logger(false),
|
||||||
|
shutdown_tx,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut config = NetworkConfig::default();
|
||||||
|
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21214, 21214, 21215);
|
||||||
|
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
|
||||||
|
config.upnp_enabled = false;
|
||||||
|
|
||||||
|
let beacon_processor_channels =
|
||||||
|
BeaconProcessorChannels::new(&BeaconProcessorConfig::default());
|
||||||
|
NetworkService::build(
|
||||||
|
beacon_chain.clone(),
|
||||||
|
&config,
|
||||||
|
executor.clone(),
|
||||||
|
None,
|
||||||
|
beacon_processor_channels.beacon_processor_tx,
|
||||||
|
beacon_processor_channels.work_reprocessing_tx,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
// Subscribe to the topics.
|
||||||
|
runtime.block_on(async {
|
||||||
|
while network_globals.gossipsub_subscriptions.read().len() < 2 {
|
||||||
|
if let Some(msg) = network_service.attestation_service.next().await {
|
||||||
|
network_service.on_attestation_service_msg(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Make sure the service is subscribed to the topics.
|
||||||
|
let (old_topic1, old_topic2) = {
|
||||||
|
let mut subnets = SubnetId::compute_subnets_for_epoch::<MinimalEthSpec>(
|
||||||
|
network_globals.local_enr().node_id().raw().into(),
|
||||||
|
beacon_chain.epoch().unwrap(),
|
||||||
|
&spec,
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(2, subnets.len());
|
||||||
|
|
||||||
|
let old_fork_digest = beacon_chain.enr_fork_id().fork_digest;
|
||||||
|
let old_topic1 = GossipTopic::new(
|
||||||
|
GossipKind::Attestation(subnets.pop().unwrap()),
|
||||||
|
GossipEncoding::SSZSnappy,
|
||||||
|
old_fork_digest,
|
||||||
|
);
|
||||||
|
let old_topic2 = GossipTopic::new(
|
||||||
|
GossipKind::Attestation(subnets.pop().unwrap()),
|
||||||
|
GossipEncoding::SSZSnappy,
|
||||||
|
old_fork_digest,
|
||||||
|
);
|
||||||
|
|
||||||
|
(old_topic1, old_topic2)
|
||||||
|
};
|
||||||
|
let subscriptions = network_globals.gossipsub_subscriptions.read().clone();
|
||||||
|
assert_eq!(2, subscriptions.len());
|
||||||
|
assert!(subscriptions.contains(&old_topic1));
|
||||||
|
assert!(subscriptions.contains(&old_topic2));
|
||||||
|
let old_topic_params1 = network_service
|
||||||
|
.get_topic_params(old_topic1.clone())
|
||||||
|
.expect("topic score params");
|
||||||
|
assert!(old_topic_params1.topic_weight > 0.0);
|
||||||
|
let old_topic_params2 = network_service
|
||||||
|
.get_topic_params(old_topic2.clone())
|
||||||
|
.expect("topic score params");
|
||||||
|
assert!(old_topic_params2.topic_weight > 0.0);
|
||||||
|
|
||||||
|
// Advance slot to the next fork
|
||||||
|
for _ in 0..MinimalEthSpec::slots_per_epoch() {
|
||||||
|
beacon_chain.slot_clock.advance_slot();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run `NetworkService::update_next_fork()`.
|
||||||
|
runtime.block_on(async {
|
||||||
|
network_service.update_next_fork();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check that topic_weight on the old topics has been zeroed.
|
||||||
|
let old_topic_params1 = network_service
|
||||||
|
.get_topic_params(old_topic1)
|
||||||
|
.expect("topic score params");
|
||||||
|
assert_eq!(0.0, old_topic_params1.topic_weight);
|
||||||
|
|
||||||
|
let old_topic_params2 = network_service
|
||||||
|
.get_topic_params(old_topic2)
|
||||||
|
.expect("topic score params");
|
||||||
|
assert_eq!(0.0, old_topic_params2.topic_weight);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user