Correct gossipsub message encoding. Add extended topics

This commit is contained in:
Age Manning 2019-08-08 12:38:54 +10:00
parent d83fa67068
commit 80f15f5d70
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93

View File

@ -1,8 +1,8 @@
use crate::config::*;
use crate::discovery::Discovery;
use crate::rpc::{RPCEvent, RPCMessage, RPC};
use crate::{error, NetworkConfig};
use crate::{Topic, TopicHash};
use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC};
use futures::prelude::*;
use libp2p::{
core::identity::Keypair,
@ -15,7 +15,6 @@ use libp2p::{
NetworkBehaviour, PeerId,
};
use slog::{debug, o, trace};
use ssz::{ssz_encode, Encode};
use std::num::NonZeroU32;
use std::time::Duration;
@ -189,9 +188,9 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topics: Vec<Topic>, message: PubsubMessage) {
let message_bytes = ssz_encode(&message);
let message_data = message.to_data();
for topic in topics {
self.gossipsub.publish(topic, message_bytes.clone());
self.gossipsub.publish(topic, message_data.clone());
}
}
@ -220,13 +219,20 @@ pub enum BehaviourEvent {
},
}
/// Messages that are passed to and from the pubsub (Gossipsub) behaviour.
/// Messages that are passed to and from the pubsub (Gossipsub) behaviour. These are encoded and
/// decoded upstream.
#[derive(Debug, Clone, PartialEq)]
pub enum PubsubMessage {
/// Gossipsub message providing notification of a new block.
Block(Vec<u8>),
/// Gossipsub message providing notification of a new attestation.
Attestation(Vec<u8>),
/// Gossipsub message providing notification of a voluntary exit.
VoluntaryExit(Vec<u8>),
/// Gossipsub message providing notification of a new proposer slashing.
ProposerSlashing(Vec<u8>),
/// Gossipsub message providing notification of a new attester slashing.
AttesterSlashing(Vec<u8>),
/// Gossipsub message from an unknown topic.
Unknown(Vec<u8>),
}
@ -240,29 +246,33 @@ impl PubsubMessage {
*/
fn from_topics(topics: &Vec<TopicHash>, data: Vec<u8>) -> Self {
for topic in topics {
match topic.as_str() {
// compare the prefix and postfix, then match on the topic
let topic_parts: Vec<&str> = topic.as_str().split('/').collect();
if topic_parts.len() == 4
&& topic_parts[1] == TOPIC_PREFIX
&& topic_parts[3] == TOPIC_ENCODING_POSTFIX
{
match topic_parts[2] {
BEACON_BLOCK_TOPIC => return PubsubMessage::Block(data),
BEACON_ATTESTATION_TOPIC => return PubsubMessage::Attestation(data),
VOLUNTARY_EXIT_TOPIC => return PubsubMessage::VoluntaryExit(data),
PROPOSER_SLASHING_TOPIC => return PubsubMessage::ProposerSlashing(data),
ATTESTER_SLASHING_TOPIC => return PubsubMessage::AttesterSlashing(data),
_ => {}
}
}
}
PubsubMessage::Unknown(data)
}
}
impl Encode for PubsubMessage {
fn is_ssz_fixed_len() -> bool {
false
}
fn ssz_append(&self, buf: &mut Vec<u8>) {
fn to_data(self) -> Vec<u8> {
match self {
PubsubMessage::Block(inner)
| PubsubMessage::Attestation(inner)
| PubsubMessage::Unknown(inner) => {
// Encode the gossip as a Vec<u8>;
buf.append(&mut inner.as_ssz_bytes());
}
PubsubMessage::Block(data)
| PubsubMessage::Attestation(data)
| PubsubMessage::VoluntaryExit(data)
| PubsubMessage::ProposerSlashing(data)
| PubsubMessage::AttesterSlashing(data)
| PubsubMessage::Unknown(data) => data,
}
}
}