Attempt to publish to at least mesh_n peers (#5357)
* Code improvements * Fix gossipsub tests * Merge latest unstable * Differentiate errors and better scoring * Attempt to publish to mesh_n peers
This commit is contained in:
parent
b9614571a3
commit
fc8f1a4ca7
@ -635,9 +635,33 @@ where
|
|||||||
|| !self.score_below_threshold(p, |ts| ts.publish_threshold).0
|
|| !self.score_below_threshold(p, |ts| ts.publish_threshold).0
|
||||||
}));
|
}));
|
||||||
} else {
|
} else {
|
||||||
match self.mesh.get(&raw_message.topic) {
|
match self.mesh.get(&topic_hash) {
|
||||||
// Mesh peers
|
// Mesh peers
|
||||||
Some(mesh_peers) => {
|
Some(mesh_peers) => {
|
||||||
|
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
|
||||||
|
// peers (if possible).
|
||||||
|
let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
|
||||||
|
|
||||||
|
if needed_extra_peers > 0 {
|
||||||
|
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
|
||||||
|
// and publish to them.
|
||||||
|
|
||||||
|
// Get a random set of peers that are appropriate to send messages too.
|
||||||
|
let peer_list = get_random_peers(
|
||||||
|
&self.connected_peers,
|
||||||
|
&topic_hash,
|
||||||
|
needed_extra_peers,
|
||||||
|
|peer| {
|
||||||
|
!mesh_peers.contains(peer)
|
||||||
|
&& !self.explicit_peers.contains(peer)
|
||||||
|
&& !self
|
||||||
|
.score_below_threshold(peer, |pst| pst.publish_threshold)
|
||||||
|
.0
|
||||||
|
},
|
||||||
|
);
|
||||||
|
recipient_peers.extend(peer_list);
|
||||||
|
}
|
||||||
|
|
||||||
recipient_peers.extend(mesh_peers);
|
recipient_peers.extend(mesh_peers);
|
||||||
}
|
}
|
||||||
// Gossipsub peers
|
// Gossipsub peers
|
||||||
@ -729,10 +753,14 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if publish_failed {
|
if recipient_peers.is_empty() {
|
||||||
return Err(PublishError::InsufficientPeers);
|
return Err(PublishError::InsufficientPeers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if publish_failed {
|
||||||
|
return Err(PublishError::AllQueuesFull(recipient_peers.len()));
|
||||||
|
}
|
||||||
|
|
||||||
tracing::debug!(message=%msg_id, "Published message");
|
tracing::debug!(message=%msg_id, "Published message");
|
||||||
|
|
||||||
if let Some(metrics) = self.metrics.as_mut() {
|
if let Some(metrics) = self.metrics.as_mut() {
|
||||||
@ -2203,11 +2231,10 @@ where
|
|||||||
if outbound <= self.config.mesh_outbound_min() {
|
if outbound <= self.config.mesh_outbound_min() {
|
||||||
// do not remove anymore outbound peers
|
// do not remove anymore outbound peers
|
||||||
continue;
|
continue;
|
||||||
} else {
|
}
|
||||||
// an outbound peer gets removed
|
// an outbound peer gets removed
|
||||||
outbound -= 1;
|
outbound -= 1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// remove the peer
|
// remove the peer
|
||||||
peers.remove(&peer);
|
peers.remove(&peer);
|
||||||
|
@ -741,8 +741,8 @@ fn test_publish_without_flood_publishing() {
|
|||||||
let config: Config = Config::default();
|
let config: Config = Config::default();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
publishes.len(),
|
publishes.len(),
|
||||||
config.mesh_n_low(),
|
config.mesh_n(),
|
||||||
"Should send a publish message to all known peers"
|
"Should send a publish message to at least mesh_n peers"
|
||||||
);
|
);
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
|
@ -36,6 +36,9 @@ pub enum PublishError {
|
|||||||
MessageTooLarge,
|
MessageTooLarge,
|
||||||
/// The compression algorithm failed.
|
/// The compression algorithm failed.
|
||||||
TransformFailed(std::io::Error),
|
TransformFailed(std::io::Error),
|
||||||
|
/// Messages could not be sent because all queues for peers were full. The usize represents the
|
||||||
|
/// number of peers that have full queues.
|
||||||
|
AllQueuesFull(usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for PublishError {
|
impl std::fmt::Display for PublishError {
|
||||||
|
Loading…
Reference in New Issue
Block a user