Correct the metrics for topic subscriptions (#5344)
* Handle fork boundaries * Merge latest unstable * Topic subscription fix
This commit is contained in:
parent
fc8f1a4ca7
commit
85c3204d70
@ -854,6 +854,13 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register topics to ensure metrics are recorded correctly for these topics.
|
||||||
|
pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
|
||||||
|
if let Some(metrics) = &mut self.metrics {
|
||||||
|
metrics.register_allowed_topics(topics);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Adds a new peer to the list of explicitly connected peers.
|
/// Adds a new peer to the list of explicitly connected peers.
|
||||||
pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
|
pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
|
||||||
tracing::debug!(peer=%peer_id, "Adding explicit peer");
|
tracing::debug!(peer=%peer_id, "Adding explicit peer");
|
||||||
|
@ -38,7 +38,7 @@ const DEFAULT_MAX_TOPICS: usize = 300;
|
|||||||
|
|
||||||
// Default value that limits how many topics for which there has never been a subscription do we
|
// Default value that limits how many topics for which there has never been a subscription do we
|
||||||
// store metrics.
|
// store metrics.
|
||||||
const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 50;
|
const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 100;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@ -392,13 +392,21 @@ impl Metrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Increase the number of peers do we known are subscribed to this topic.
|
/// Registers a set of topics that we want to store calculate metrics for.
|
||||||
|
pub(crate) fn register_allowed_topics(&mut self, topics: Vec<TopicHash>) {
|
||||||
|
for topic_hash in topics {
|
||||||
|
self.topic_info.insert(topic_hash, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increase the number of peers that are subscribed to this topic.
|
||||||
pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) {
|
pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) {
|
||||||
if self.register_topic(topic).is_ok() {
|
if self.register_topic(topic).is_ok() {
|
||||||
self.topic_peers_count.get_or_create(topic).inc();
|
self.topic_peers_count.get_or_create(topic).inc();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Decrease the number of peers that are subscribed to this topic.
|
||||||
pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) {
|
pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) {
|
||||||
if self.register_topic(topic).is_ok() {
|
if self.register_topic(topic).is_ok() {
|
||||||
self.topic_peers_count.get_or_create(topic).dec();
|
self.topic_peers_count.get_or_create(topic).dec();
|
||||||
|
@ -18,9 +18,9 @@ use crate::rpc::*;
|
|||||||
use crate::service::behaviour::BehaviourEvent;
|
use crate::service::behaviour::BehaviourEvent;
|
||||||
pub use crate::service::behaviour::Gossipsub;
|
pub use crate::service::behaviour::Gossipsub;
|
||||||
use crate::types::{
|
use crate::types::{
|
||||||
fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic,
|
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
|
||||||
SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS,
|
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
|
||||||
CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
|
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
|
||||||
};
|
};
|
||||||
use crate::EnrExt;
|
use crate::EnrExt;
|
||||||
use crate::Eth2Enr;
|
use crate::Eth2Enr;
|
||||||
@ -275,6 +275,22 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
|
|||||||
.with_peer_score(params, thresholds)
|
.with_peer_score(params, thresholds)
|
||||||
.expect("Valid score params and thresholds");
|
.expect("Valid score params and thresholds");
|
||||||
|
|
||||||
|
// If we are using metrics, then register which topics we want to make sure to keep
|
||||||
|
// track of
|
||||||
|
if ctx.libp2p_registry.is_some() {
|
||||||
|
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<TSpec>()
|
||||||
|
.map(|gossip_kind| {
|
||||||
|
Topic::from(GossipTopic::new(
|
||||||
|
gossip_kind,
|
||||||
|
GossipEncoding::default(),
|
||||||
|
enr_fork_id.fork_digest,
|
||||||
|
))
|
||||||
|
.into()
|
||||||
|
})
|
||||||
|
.collect::<Vec<TopicHash>>();
|
||||||
|
gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for);
|
||||||
|
}
|
||||||
|
|
||||||
(gossipsub, update_gossipsub_scores)
|
(gossipsub, update_gossipsub_scores)
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -640,6 +656,20 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
|
|||||||
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
|
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
|
||||||
self.subscribe(topic);
|
self.subscribe(topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register the new topics for metrics
|
||||||
|
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<TSpec>()
|
||||||
|
.map(|gossip_kind| {
|
||||||
|
Topic::from(GossipTopic::new(
|
||||||
|
gossip_kind,
|
||||||
|
GossipEncoding::default(),
|
||||||
|
new_fork_digest,
|
||||||
|
))
|
||||||
|
.into()
|
||||||
|
})
|
||||||
|
.collect::<Vec<TopicHash>>();
|
||||||
|
self.gossipsub_mut()
|
||||||
|
.register_topics_for_metrics(topics_to_keep_metrics_for);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unsubscribe from all topics that doesn't have the given fork_digest
|
/// Unsubscribe from all topics that doesn't have the given fork_digest
|
||||||
|
@ -17,7 +17,7 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
|
|||||||
pub use subnet::{Subnet, SubnetDiscovery};
|
pub use subnet::{Subnet, SubnetDiscovery};
|
||||||
pub use sync_state::{BackFillState, SyncState};
|
pub use sync_state::{BackFillState, SyncState};
|
||||||
pub use topics::{
|
pub use topics::{
|
||||||
core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind,
|
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
|
||||||
GossipTopic, ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS,
|
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS,
|
||||||
LIGHT_CLIENT_GOSSIP_TOPICS,
|
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
|
||||||
};
|
};
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use crate::gossipsub::{IdentTopic as Topic, TopicHash};
|
use crate::gossipsub::{IdentTopic as Topic, TopicHash};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use strum::AsRefStr;
|
use strum::AsRefStr;
|
||||||
use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId};
|
use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};
|
||||||
|
|
||||||
use crate::Subnet;
|
use crate::Subnet;
|
||||||
|
|
||||||
@ -62,6 +62,17 @@ pub fn fork_core_topics<T: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> V
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns all the attestation and sync committee topics, for a given fork.
|
||||||
|
pub fn attestation_sync_committee_topics<TSpec: EthSpec>() -> impl Iterator<Item = GossipKind> {
|
||||||
|
(0..TSpec::SubnetBitfieldLength::to_usize())
|
||||||
|
.map(|subnet_id| GossipKind::Attestation(SubnetId::new(subnet_id as u64)))
|
||||||
|
.chain(
|
||||||
|
(0..TSpec::SyncCommitteeSubnetCount::to_usize()).map(|sync_committee_id| {
|
||||||
|
GossipKind::SyncCommitteeMessage(SyncSubnetId::new(sync_committee_id as u64))
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns all the topics that we need to subscribe to for a given fork
|
/// Returns all the topics that we need to subscribe to for a given fork
|
||||||
/// including topics from older forks and new topics for the current fork.
|
/// including topics from older forks and new topics for the current fork.
|
||||||
pub fn core_topics_to_subscribe<T: EthSpec>(
|
pub fn core_topics_to_subscribe<T: EthSpec>(
|
||||||
|
Loading…
Reference in New Issue
Block a user