From 5b18fd92cb96cb55885b4d4a2d5f47328bd24167 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 1 Mar 2023 09:22:48 +0000 Subject: [PATCH] Cleaner logic for gossip subscriptions for new forks (#4030) ## Issue Addressed Cleaner resolution for #4006 ## Proposed Changes We are currently subscribing to core topics of new forks way before the actual fork since we had just a single `CORE_TOPICS` array. This PR separates the core topics for every fork and subscribes to only required topics based on the current fork. Also adds logic for subscribing to the core topics of a new fork only 2 slots before the fork happens. 2 slots is to give enough time for the gossip meshes to form. Currently doesn't add logic to remove topics from older forks in new forks. For e.g. in the coupled 4844 world, we had to remove the `BeaconBlock` topic in favour of `BeaconBlocksAndBlobsSidecar` at the 4844 fork. It should be easy enough to add though. Not adding it because I'm assuming that #4019 will get merged before this PR and we won't require any deletion logic. Happy to add it regardless though. --- .../lighthouse_network/src/service/mod.rs | 16 +++++-- .../lighthouse_network/src/types/mod.rs | 4 +- .../lighthouse_network/src/types/topics.rs | 43 +++++++++++++++++-- beacon_node/network/src/service.rs | 6 +-- 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 0110eb958..e20b86e54 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -13,8 +13,8 @@ use crate::rpc::*; use crate::service::behaviour::BehaviourEvent; pub use crate::service::behaviour::Gossipsub; use crate::types::{ - subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, - SubnetDiscovery, + fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, + SnappyTransform, Subnet, SubnetDiscovery, }; use crate::EnrExt; use crate::Eth2Enr; @@ -41,6 +41,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; +use types::ForkName; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; @@ -559,13 +560,20 @@ impl Network { self.unsubscribe(gossip_topic) } - /// Subscribe to all currently subscribed topics with the new fork digest. - pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) { + /// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`. + pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) { + // Subscribe to existing topics with new fork digest let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); for mut topic in subscriptions.into_iter() { topic.fork_digest = new_fork_digest; self.subscribe(topic); } + + // Subscribe to core topics for the new fork + for kind in fork_core_topics(&new_fork) { + let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); + self.subscribe(topic); + } } /// Unsubscribe from all topics that doesn't have the given fork_digest diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index 2a5ca6c80..e7457f25d 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -17,6 +17,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{ - subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS, - LIGHT_CLIENT_GOSSIP_TOPICS, + core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, + GossipTopic, LIGHT_CLIENT_GOSSIP_TOPICS, }; diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 8194fa63b..0e4aefbb5 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,7 +1,7 @@ use libp2p::gossipsub::{IdentTopic as Topic, TopicHash}; use serde_derive::{Deserialize, Serialize}; use strum::AsRefStr; -use types::{SubnetId, SyncSubnetId}; +use types::{ForkName, SubnetId, SyncSubnetId}; use crate::Subnet; @@ -22,21 +22,45 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change"; pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; -pub const CORE_TOPICS: [GossipKind; 7] = [ +pub const BASE_CORE_TOPICS: [GossipKind; 5] = [ GossipKind::BeaconBlock, GossipKind::BeaconAggregateAndProof, GossipKind::VoluntaryExit, GossipKind::ProposerSlashing, GossipKind::AttesterSlashing, - GossipKind::SignedContributionAndProof, - GossipKind::BlsToExecutionChange, ]; +pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof]; + +pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange]; + pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [ GossipKind::LightClientFinalityUpdate, GossipKind::LightClientOptimisticUpdate, ]; +/// Returns the core topics associated with each fork that are new to the previous fork +pub fn fork_core_topics(fork_name: &ForkName) -> Vec { + match fork_name { + ForkName::Base => BASE_CORE_TOPICS.to_vec(), + ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(), + ForkName::Merge => vec![], + ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(), + } +} + +/// Returns all the topics that we need to subscribe to for a given fork +/// including topics from older forks and new topics for the current fork. +pub fn core_topics_to_subscribe(mut current_fork: ForkName) -> Vec { + let mut topics = fork_core_topics(¤t_fork); + while let Some(previous_fork) = current_fork.previous_fork() { + let previous_fork_topics = fork_core_topics(&previous_fork); + topics.extend(previous_fork_topics); + current_fork = previous_fork; + } + topics +} + /// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// the pubsub protocol and the way the messages should be encoded. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] @@ -390,4 +414,15 @@ mod tests { assert_eq!("proposer_slashing", ProposerSlashing.as_ref()); assert_eq!("attester_slashing", AttesterSlashing.as_ref()); } + + #[test] + fn test_core_topics_to_subscribe() { + let mut all_topics = Vec::new(); + all_topics.extend(CAPELLA_CORE_TOPICS); + all_topics.extend(ALTAIR_CORE_TOPICS); + all_topics.extend(BASE_CORE_TOPICS); + + let latest_fork = *ForkName::list_all().last().unwrap(); + assert_eq!(core_topics_to_subscribe(latest_fork), all_topics); + } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 4568ed1a2..410461bcd 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -19,7 +19,7 @@ use lighthouse_network::{ Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet, }; use lighthouse_network::{ - types::{GossipEncoding, GossipTopic}, + types::{core_topics_to_subscribe, GossipEncoding, GossipTopic}, MessageId, NetworkEvent, NetworkGlobals, PeerId, }; use slog::{crit, debug, error, info, o, trace, warn}; @@ -445,7 +445,7 @@ impl NetworkService { let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name); let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root); info!(self.log, "Subscribing to new fork topics"); - self.libp2p.subscribe_new_fork_topics(fork_digest); + self.libp2p.subscribe_new_fork_topics(fork_name, fork_digest); self.next_fork_subscriptions = Box::pin(None.into()); } else { @@ -684,7 +684,7 @@ impl NetworkService { } let mut subscribed_topics: Vec = vec![]; - for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() { + for topic_kind in core_topics_to_subscribe(self.fork_context.current_fork()) { for fork_digest in self.required_gossip_fork_digests() { let topic = GossipTopic::new( topic_kind.clone(),