Retry gossipsub messages when insufficient peers (#2964)

## Issue Addressed
#2947 

## Proposed Changes

Store messages that fail to be published due to insufficient peers for retry later. Messages expire after half an epoch and are retried if gossipsub informs us that an useful peer has connected. Currently running in Atlanta

## Additional Info
If on retry sending the messages fails they will not be tried again
This commit is contained in:
Divma 2022-02-03 01:12:30 +00:00
parent 0177b9286e
commit 615695776e
4 changed files with 349 additions and 16 deletions

View File

@ -0,0 +1,247 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use crate::types::GossipKind;
use crate::GossipTopic;
use tokio_util::time::delay_queue::{DelayQueue, Key};
/// Store of gossip messages that we failed to publish and will try again later. By default, all
/// messages are ignored. This behaviour can be changed using `GossipCacheBuilder::default_timeout`
/// to apply the same delay to every kind. Individual timeouts for specific kinds can be set and
/// will overwrite the default_timeout if present.
pub struct GossipCache {
/// Expire timeouts for each topic-msg pair.
expirations: DelayQueue<(GossipTopic, Vec<u8>)>,
/// Messages cached for each topic.
topic_msgs: HashMap<GossipTopic, HashMap<Vec<u8>, Key>>,
/// Timeout for blocks.
beacon_block: Option<Duration>,
/// Timeout for aggregate attestations.
aggregates: Option<Duration>,
/// Timeout for attestations.
attestation: Option<Duration>,
/// Timeout for voluntary exits.
voluntary_exit: Option<Duration>,
/// Timeout for proposer slashings.
proposer_slashing: Option<Duration>,
/// Timeout for attester slashings.
attester_slashing: Option<Duration>,
/// Timeout for aggregated sync commitee signatures.
signed_contribution_and_proof: Option<Duration>,
/// Timeout for sync commitee messages.
sync_committee_message: Option<Duration>,
}
#[derive(Default)]
pub struct GossipCacheBuilder {
default_timeout: Option<Duration>,
/// Timeout for blocks.
beacon_block: Option<Duration>,
/// Timeout for aggregate attestations.
aggregates: Option<Duration>,
/// Timeout for attestations.
attestation: Option<Duration>,
/// Timeout for voluntary exits.
voluntary_exit: Option<Duration>,
/// Timeout for proposer slashings.
proposer_slashing: Option<Duration>,
/// Timeout for attester slashings.
attester_slashing: Option<Duration>,
/// Timeout for aggregated sync commitee signatures.
signed_contribution_and_proof: Option<Duration>,
/// Timeout for sync commitee messages.
sync_committee_message: Option<Duration>,
}
#[allow(dead_code)]
impl GossipCacheBuilder {
/// By default, all timeouts all disabled. Setting a default timeout will enable all timeout
/// that are not already set.
pub fn default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = Some(timeout);
self
}
/// Timeout for blocks.
pub fn beacon_block_timeout(mut self, timeout: Duration) -> Self {
self.beacon_block = Some(timeout);
self
}
/// Timeout for aggregate attestations.
pub fn aggregates_timeout(mut self, timeout: Duration) -> Self {
self.aggregates = Some(timeout);
self
}
/// Timeout for attestations.
pub fn attestation_timeout(mut self, timeout: Duration) -> Self {
self.attestation = Some(timeout);
self
}
/// Timeout for voluntary exits.
pub fn voluntary_exit_timeout(mut self, timeout: Duration) -> Self {
self.voluntary_exit = Some(timeout);
self
}
/// Timeout for proposer slashings.
pub fn proposer_slashing_timeout(mut self, timeout: Duration) -> Self {
self.proposer_slashing = Some(timeout);
self
}
/// Timeout for attester slashings.
pub fn attester_slashing(mut self, timeout: Duration) -> Self {
self.attester_slashing = Some(timeout);
self
}
/// Timeout for aggregated sync commitee signatures.
pub fn signed_contribution_and_proof(mut self, timeout: Duration) -> Self {
self.signed_contribution_and_proof = Some(timeout);
self
}
/// Timeout for sync commitee messages.
pub fn sync_committee_message(mut self, timeout: Duration) -> Self {
self.sync_committee_message = Some(timeout);
self
}
pub fn build(self) -> GossipCache {
let GossipCacheBuilder {
default_timeout,
beacon_block,
aggregates,
attestation,
voluntary_exit,
proposer_slashing,
attester_slashing,
signed_contribution_and_proof,
sync_committee_message,
} = self;
GossipCache {
expirations: DelayQueue::default(),
topic_msgs: HashMap::default(),
beacon_block: beacon_block.or(default_timeout),
aggregates: aggregates.or(default_timeout),
attestation: attestation.or(default_timeout),
voluntary_exit: voluntary_exit.or(default_timeout),
proposer_slashing: proposer_slashing.or(default_timeout),
attester_slashing: attester_slashing.or(default_timeout),
signed_contribution_and_proof: signed_contribution_and_proof.or(default_timeout),
sync_committee_message: sync_committee_message.or(default_timeout),
}
}
}
impl GossipCache {
/// Get a builder of a `GossipCache`. Topic kinds for which no timeout is defined will be
/// ignored if added in `insert`.
pub fn builder() -> GossipCacheBuilder {
GossipCacheBuilder::default()
}
// Insert a message to be sent later.
pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
let expire_timeout = match topic.kind() {
GossipKind::BeaconBlock => self.beacon_block,
GossipKind::BeaconAggregateAndProof => self.aggregates,
GossipKind::Attestation(_) => self.attestation,
GossipKind::VoluntaryExit => self.voluntary_exit,
GossipKind::ProposerSlashing => self.proposer_slashing,
GossipKind::AttesterSlashing => self.attester_slashing,
GossipKind::SignedContributionAndProof => self.signed_contribution_and_proof,
GossipKind::SyncCommitteeMessage(_) => self.sync_committee_message,
};
let expire_timeout = match expire_timeout {
Some(expire_timeout) => expire_timeout,
None => return,
};
match self
.topic_msgs
.entry(topic.clone())
.or_default()
.entry(data.clone())
{
Entry::Occupied(key) => self.expirations.reset(key.get(), expire_timeout),
Entry::Vacant(entry) => {
let key = self.expirations.insert((topic, data), expire_timeout);
entry.insert(key);
}
}
}
// Get the registered messages for this topic.
pub fn retrieve(&mut self, topic: &GossipTopic) -> Option<impl Iterator<Item = Vec<u8>> + '_> {
if let Some(msgs) = self.topic_msgs.remove(topic) {
for (_, key) in msgs.iter() {
self.expirations.remove(key);
}
Some(msgs.into_keys())
} else {
None
}
}
}
impl futures::stream::Stream for GossipCache {
type Item = Result<GossipTopic, String>; // We don't care to retrieve the expired data.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.expirations.poll_expired(cx) {
Poll::Ready(Some(Ok(expired))) => {
let expected_key = expired.key();
let (topic, data) = expired.into_inner();
match self.topic_msgs.get_mut(&topic) {
Some(msgs) => {
let key = msgs.remove(&data);
debug_assert_eq!(key, Some(expected_key));
if msgs.is_empty() {
// no more messages for this topic.
self.topic_msgs.remove(&topic);
}
}
None => {
#[cfg(debug_assertions)]
panic!("Topic for registered message is not present.")
}
}
Poll::Ready(Some(Ok(topic)))
}
Poll::Ready(Some(Err(x))) => Poll::Ready(Some(Err(x.to_string()))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use crate::types::GossipKind;
use super::*;
use futures::stream::StreamExt;
#[tokio::test]
async fn test_stream() {
let mut cache = GossipCache::builder()
.default_timeout(Duration::from_millis(300))
.build();
let test_topic = GossipTopic::new(
GossipKind::Attestation(1u64.into()),
crate::types::GossipEncoding::SSZSnappy,
[0u8; 4],
);
cache.insert(test_topic, vec![]);
tokio::time::sleep(Duration::from_millis(300)).await;
while cache.next().await.is_some() {}
assert!(cache.expirations.is_empty());
assert!(cache.topic_msgs.is_empty());
}
}

View File

@ -15,6 +15,8 @@ use crate::types::{
};
use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
use futures::stream::StreamExt;
use libp2p::gossipsub::error::PublishError;
use libp2p::{
core::{
connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr,
@ -50,6 +52,9 @@ use types::{
SignedBeaconBlock, Slot, SubnetId, SyncSubnetId,
};
use self::gossip_cache::GossipCache;
mod gossip_cache;
pub mod gossipsub_scoring_parameters;
/// The number of peers we target per subnet for discovery queries.
@ -177,6 +182,8 @@ pub struct Behaviour<TSpec: EthSpec> {
/// The interval for updating gossipsub scores
#[behaviour(ignore)]
update_gossipsub_scores: tokio::time::Interval,
#[behaviour(ignore)]
gossip_cache: GossipCache,
/// Logger for behaviour actions.
#[behaviour(ignore)]
log: slog::Logger,
@ -280,6 +287,16 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
..Default::default()
};
let slot_duration = std::time::Duration::from_secs(ctx.chain_spec.seconds_per_slot);
// Half an epoch
let gossip_max_retry_delay = std::time::Duration::from_secs(
ctx.chain_spec.seconds_per_slot * TSpec::slots_per_epoch() / 2,
);
let gossip_cache = GossipCache::builder()
.default_timeout(gossip_max_retry_delay)
.beacon_block_timeout(slot_duration)
.build();
Ok(Behaviour {
// Sub-behaviours
gossipsub,
@ -297,6 +314,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
log: behaviour_log,
score_settings,
fork_context: ctx.fork_context,
gossip_cache,
update_gossipsub_scores,
})
}
@ -422,9 +440,11 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
for message in messages {
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
let message_data = message.encode(GossipEncoding::default());
if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) {
slog::warn!(self.log, "Could not publish message";
"error" => ?e);
if let Err(e) = self
.gossipsub
.publish(topic.clone().into(), message_data.clone())
{
slog::warn!(self.log, "Could not publish message"; "error" => ?e);
// add to metrics
match topic.kind() {
@ -445,6 +465,10 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
};
}
}
if let PublishError::InsufficientPeers = e {
self.gossip_cache.insert(topic, message_data);
}
}
}
}
@ -868,11 +892,39 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<
}
}
GossipsubEvent::Subscribed { peer_id, topic } => {
if let Some(subnet_id) = subnet_from_topic_hash(&topic) {
self.network_globals
.peers
.write()
.add_subscription(&peer_id, subnet_id);
if let Ok(topic) = GossipTopic::decode(topic.as_str()) {
if let Some(subnet_id) = topic.subnet_id() {
self.network_globals
.peers
.write()
.add_subscription(&peer_id, subnet_id);
}
// Try to send the cached messages for this topic
if let Some(msgs) = self.gossip_cache.retrieve(&topic) {
for data in msgs {
let topic_str: &str = topic.kind().as_ref();
match self.gossipsub.publish(topic.clone().into(), data) {
Ok(_) => {
warn!(self.log, "Gossip message published on retry"; "topic" => topic_str);
if let Some(v) = metrics::get_int_counter(
&metrics::GOSSIP_LATE_PUBLISH_PER_TOPIC_KIND,
&[topic_str],
) {
v.inc()
};
}
Err(e) => {
warn!(self.log, "Gossip message publish failed on retry"; "topic" => topic_str, "error" => %e);
if let Some(v) = metrics::get_int_counter(
&metrics::GOSSIP_FAILED_LATE_PUBLISH_PER_TOPIC_KIND,
&[topic_str],
) {
v.inc()
};
}
}
}
}
}
}
GossipsubEvent::Unsubscribed { peer_id, topic } => {
@ -1125,6 +1177,21 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
self.peer_manager.update_gossipsub_scores(&self.gossipsub);
}
// poll the gossipsub cache to clear expired messages
while let Poll::Ready(Some(result)) = self.gossip_cache.poll_next_unpin(cx) {
match result {
Err(e) => warn!(self.log, "Gossip cache error"; "error" => e),
Ok(expired_topic) => {
if let Some(v) = metrics::get_int_counter(
&metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND,
&[expired_topic.kind().as_ref()],
) {
v.inc()
};
}
}
}
Poll::Pending
}
}

View File

@ -81,14 +81,30 @@ lazy_static! {
"Gossipsub messages that we did not accept, per client",
&["client", "validation_result"]
);
pub static ref GOSSIP_LATE_PUBLISH_PER_TOPIC_KIND: Result<IntCounterVec> =
try_create_int_counter_vec(
"gossipsub_late_publish_per_topic_kind",
"Messages published late to gossipsub per topic kind.",
&["topic_kind"]
);
pub static ref GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND: Result<IntCounterVec> =
try_create_int_counter_vec(
"gossipsub_expired_late_publish_per_topic_kind",
"Messages that expired waiting to be published on retry to gossipsub per topic kind.",
&["topic_kind"]
);
pub static ref GOSSIP_FAILED_LATE_PUBLISH_PER_TOPIC_KIND: Result<IntCounterVec> =
try_create_int_counter_vec(
"gossipsub_failed_late_publish_per_topic_kind",
"Messages that failed to be published on retry to gossipsub per topic kind.",
&["topic_kind"]
);
pub static ref PEER_SCORE_DISTRIBUTION: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"peer_score_distribution",
"The distribution of connected peer scores",
&["position"]
);
pub static ref PEER_SCORE_PER_CLIENT: Result<GaugeVec> =
try_create_float_gauge_vec(
"peer_score_per_client",

View File

@ -159,6 +159,14 @@ impl GossipTopic {
Err(format!("Unknown topic: {}", topic))
}
pub fn subnet_id(&self) -> Option<Subnet> {
match self.kind() {
GossipKind::Attestation(subnet_id) => Some(Subnet::Attestation(*subnet_id)),
GossipKind::SyncCommitteeMessage(subnet_id) => Some(Subnet::SyncCommittee(*subnet_id)),
_ => None,
}
}
}
impl From<GossipTopic> for Topic {
@ -237,12 +245,7 @@ impl From<Subnet> for GossipKind {
/// Get subnet id from an attestation subnet topic hash.
pub fn subnet_from_topic_hash(topic_hash: &TopicHash) -> Option<Subnet> {
let gossip_topic = GossipTopic::decode(topic_hash.as_str()).ok()?;
match gossip_topic.kind() {
GossipKind::Attestation(subnet_id) => Some(Subnet::Attestation(*subnet_id)),
GossipKind::SyncCommitteeMessage(subnet_id) => Some(Subnet::SyncCommittee(*subnet_id)),
_ => None,
}
GossipTopic::decode(topic_hash.as_str()).ok()?.subnet_id()
}
// Determines if a string is an attestation or sync committee topic.