From 80f15f5d700693520ed7ca722e1cd9b0227147c2 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 8 Aug 2019 12:38:54 +1000 Subject: [PATCH] Correct gossipsub message encoding. Add extended topics --- beacon_node/eth2-libp2p/src/behaviour.rs | 54 ++++++++++++++---------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index b87f8a061..749d2e5b4 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,8 +1,8 @@ +use crate::config::*; use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, RPC}; use crate::{error, NetworkConfig}; use crate::{Topic, TopicHash}; -use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC}; use futures::prelude::*; use libp2p::{ core::identity::Keypair, @@ -15,7 +15,6 @@ use libp2p::{ NetworkBehaviour, PeerId, }; use slog::{debug, o, trace}; -use ssz::{ssz_encode, Encode}; use std::num::NonZeroU32; use std::time::Duration; @@ -189,9 +188,9 @@ impl Behaviour { /// Publishes a message on the pubsub (gossipsub) behaviour. pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { - let message_bytes = ssz_encode(&message); + let message_data = message.to_data(); for topic in topics { - self.gossipsub.publish(topic, message_bytes.clone()); + self.gossipsub.publish(topic, message_data.clone()); } } @@ -220,13 +219,20 @@ pub enum BehaviourEvent { }, } -/// Messages that are passed to and from the pubsub (Gossipsub) behaviour. +/// Messages that are passed to and from the pubsub (Gossipsub) behaviour. These are encoded and +/// decoded upstream. #[derive(Debug, Clone, PartialEq)] pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. Block(Vec), /// Gossipsub message providing notification of a new attestation. Attestation(Vec), + /// Gossipsub message providing notification of a voluntary exit. + VoluntaryExit(Vec), + /// Gossipsub message providing notification of a new proposer slashing. + ProposerSlashing(Vec), + /// Gossipsub message providing notification of a new attester slashing. + AttesterSlashing(Vec), /// Gossipsub message from an unknown topic. Unknown(Vec), } @@ -240,29 +246,33 @@ impl PubsubMessage { */ fn from_topics(topics: &Vec, data: Vec) -> Self { for topic in topics { - match topic.as_str() { - BEACON_BLOCK_TOPIC => return PubsubMessage::Block(data), - BEACON_ATTESTATION_TOPIC => return PubsubMessage::Attestation(data), - _ => {} + // compare the prefix and postfix, then match on the topic + let topic_parts: Vec<&str> = topic.as_str().split('/').collect(); + if topic_parts.len() == 4 + && topic_parts[1] == TOPIC_PREFIX + && topic_parts[3] == TOPIC_ENCODING_POSTFIX + { + match topic_parts[2] { + BEACON_BLOCK_TOPIC => return PubsubMessage::Block(data), + BEACON_ATTESTATION_TOPIC => return PubsubMessage::Attestation(data), + VOLUNTARY_EXIT_TOPIC => return PubsubMessage::VoluntaryExit(data), + PROPOSER_SLASHING_TOPIC => return PubsubMessage::ProposerSlashing(data), + ATTESTER_SLASHING_TOPIC => return PubsubMessage::AttesterSlashing(data), + _ => {} + } } } PubsubMessage::Unknown(data) } -} -impl Encode for PubsubMessage { - fn is_ssz_fixed_len() -> bool { - false - } - - fn ssz_append(&self, buf: &mut Vec) { + fn to_data(self) -> Vec { match self { - PubsubMessage::Block(inner) - | PubsubMessage::Attestation(inner) - | PubsubMessage::Unknown(inner) => { - // Encode the gossip as a Vec; - buf.append(&mut inner.as_ssz_bytes()); - } + PubsubMessage::Block(data) + | PubsubMessage::Attestation(data) + | PubsubMessage::VoluntaryExit(data) + | PubsubMessage::ProposerSlashing(data) + | PubsubMessage::AttesterSlashing(data) + | PubsubMessage::Unknown(data) => data, } } }