Add snappy encoding to gossipsub messages (#984)
* Add snappy encode/decode to gossip messages * Fix gossipsub tests
This commit is contained in:
parent
cc63c2b769
commit
d7e2938296
7
Cargo.lock
generated
7
Cargo.lock
generated
@ -1207,6 +1207,7 @@ dependencies = [
|
||||
"slog-stdlog 4.0.0",
|
||||
"slog-term",
|
||||
"smallvec 1.2.0",
|
||||
"snap",
|
||||
"tempdir",
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
@ -4173,6 +4174,12 @@ version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c2fb2ec9bcd216a5b0d0ccf31ab17b5ed1d627960edff65bbe95d3ce221cefc"
|
||||
|
||||
[[package]]
|
||||
name = "snap"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f7fb9b0bb877b35a1cc1474a3b43d9c226a2625311760cdda2cbccbc0c7a8376"
|
||||
|
||||
[[package]]
|
||||
name = "snow"
|
||||
version = "0.6.2"
|
||||
|
@ -31,6 +31,7 @@ lru = "0.4.3"
|
||||
parking_lot = "0.9.0"
|
||||
sha2 = "0.8.0"
|
||||
base64 = "0.11.0"
|
||||
snap = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
slog-stdlog = "4.0.0"
|
||||
|
@ -105,16 +105,22 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
|
||||
/// Subscribes to a gossipsub topic kind, letting the network service determine the
|
||||
/// encoding and fork version.
|
||||
pub fn subscribe_kind(&mut self, kind: GossipKind) -> bool {
|
||||
let gossip_topic =
|
||||
GossipTopic::new(kind, GossipEncoding::SSZ, self.enr_fork_id.fork_digest);
|
||||
let gossip_topic = GossipTopic::new(
|
||||
kind,
|
||||
GossipEncoding::default(),
|
||||
self.enr_fork_id.fork_digest,
|
||||
);
|
||||
self.subscribe(gossip_topic)
|
||||
}
|
||||
|
||||
/// Unsubscribes from a gossipsub topic kind, letting the network service determine the
|
||||
/// encoding and fork version.
|
||||
pub fn unsubscribe_kind(&mut self, kind: GossipKind) -> bool {
|
||||
let gossip_topic =
|
||||
GossipTopic::new(kind, GossipEncoding::SSZ, self.enr_fork_id.fork_digest);
|
||||
let gossip_topic = GossipTopic::new(
|
||||
kind,
|
||||
GossipEncoding::default(),
|
||||
self.enr_fork_id.fork_digest,
|
||||
);
|
||||
self.unsubscribe(gossip_topic)
|
||||
}
|
||||
|
||||
@ -122,7 +128,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
|
||||
pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) -> bool {
|
||||
let topic = GossipTopic::new(
|
||||
subnet_id.into(),
|
||||
GossipEncoding::SSZ,
|
||||
GossipEncoding::default(),
|
||||
self.enr_fork_id.fork_digest,
|
||||
);
|
||||
self.subscribe(topic)
|
||||
@ -132,7 +138,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
|
||||
pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) -> bool {
|
||||
let topic = GossipTopic::new(
|
||||
subnet_id.into(),
|
||||
GossipEncoding::SSZ,
|
||||
GossipEncoding::default(),
|
||||
self.enr_fork_id.fork_digest,
|
||||
);
|
||||
self.unsubscribe(topic)
|
||||
@ -165,9 +171,13 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
|
||||
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
|
||||
pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) {
|
||||
for message in messages {
|
||||
for topic in message.topics(GossipEncoding::SSZ, self.enr_fork_id.fork_digest) {
|
||||
let message_data = message.encode(GossipEncoding::SSZ);
|
||||
self.gossipsub.publish(&topic.into(), message_data);
|
||||
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
|
||||
match message.encode(GossipEncoding::default()) {
|
||||
Ok(message_data) => {
|
||||
self.gossipsub.publish(&topic.into(), message_data);
|
||||
}
|
||||
Err(e) => crit!(self.log, "Could not publish message"; "error" => e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,8 @@ use sha2::{Digest, Sha256};
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
pub const GOSSIP_MAX_SIZE: usize = 1_048_576;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
/// Network configuration for lighthouse.
|
||||
@ -98,7 +100,7 @@ impl Default for Config {
|
||||
// Note: The topics by default are sent as plain strings. Hashes are an optional
|
||||
// parameter.
|
||||
let gs_config = GossipsubConfigBuilder::new()
|
||||
.max_transmit_size(1_048_576)
|
||||
.max_transmit_size(GOSSIP_MAX_SIZE)
|
||||
.heartbeat_interval(Duration::from_secs(20)) // TODO: Reduce for mainnet
|
||||
.manual_propagation() // require validation before propagation
|
||||
.no_source_id()
|
||||
|
@ -1,7 +1,9 @@
|
||||
//! Handles the encoding and decoding of pubsub messages.
|
||||
|
||||
use crate::config::GOSSIP_MAX_SIZE;
|
||||
use crate::types::{GossipEncoding, GossipKind, GossipTopic};
|
||||
use crate::TopicHash;
|
||||
use snap::raw::{decompress_len, Decoder, Encoder};
|
||||
use ssz::{Decode, Encode};
|
||||
use std::boxed::Box;
|
||||
use types::SubnetId;
|
||||
@ -66,55 +68,71 @@ impl<T: EthSpec> PubsubMessage<T> {
|
||||
continue;
|
||||
}
|
||||
Ok(gossip_topic) => {
|
||||
match gossip_topic.encoding() {
|
||||
let mut decompressed_data: Vec<u8> = Vec::new();
|
||||
let data = match gossip_topic.encoding() {
|
||||
// group each part by encoding type
|
||||
GossipEncoding::SSZ => {
|
||||
// the ssz decoders
|
||||
match gossip_topic.kind() {
|
||||
GossipKind::BeaconAggregateAndProof => {
|
||||
let agg_and_proof =
|
||||
SignedAggregateAndProof::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::AggregateAndProofAttestation(
|
||||
Box::new(agg_and_proof),
|
||||
));
|
||||
GossipEncoding::SSZSnappy => {
|
||||
match decompress_len(data) {
|
||||
Ok(n) if n > GOSSIP_MAX_SIZE => {
|
||||
return Err("ssz_snappy decoded data > GOSSIP_MAX_SIZE".into());
|
||||
}
|
||||
GossipKind::CommitteeIndex(subnet_id) => {
|
||||
let attestation = Attestation::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::Attestation(Box::new((
|
||||
*subnet_id,
|
||||
attestation,
|
||||
))));
|
||||
Ok(n) => decompressed_data.resize(n, 0),
|
||||
Err(e) => {
|
||||
return Err(format!("{}", e));
|
||||
}
|
||||
GossipKind::BeaconBlock => {
|
||||
let beacon_block = SignedBeaconBlock::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block)));
|
||||
}
|
||||
GossipKind::VoluntaryExit => {
|
||||
let voluntary_exit = VoluntaryExit::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::VoluntaryExit(Box::new(
|
||||
voluntary_exit,
|
||||
)));
|
||||
}
|
||||
GossipKind::ProposerSlashing => {
|
||||
let proposer_slashing = ProposerSlashing::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::ProposerSlashing(Box::new(
|
||||
proposer_slashing,
|
||||
)));
|
||||
}
|
||||
GossipKind::AttesterSlashing => {
|
||||
let attester_slashing = AttesterSlashing::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::AttesterSlashing(Box::new(
|
||||
attester_slashing,
|
||||
)));
|
||||
};
|
||||
let mut decoder = Decoder::new();
|
||||
match decoder.decompress(data, &mut decompressed_data) {
|
||||
Ok(n) => {
|
||||
decompressed_data.truncate(n);
|
||||
&decompressed_data
|
||||
}
|
||||
Err(e) => return Err(format!("{}", e)),
|
||||
}
|
||||
}
|
||||
GossipEncoding::SSZ => data,
|
||||
};
|
||||
// the ssz decoders
|
||||
match gossip_topic.kind() {
|
||||
GossipKind::BeaconAggregateAndProof => {
|
||||
let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::AggregateAndProofAttestation(Box::new(
|
||||
agg_and_proof,
|
||||
)));
|
||||
}
|
||||
GossipKind::CommitteeIndex(subnet_id) => {
|
||||
let attestation = Attestation::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::Attestation(Box::new((
|
||||
*subnet_id,
|
||||
attestation,
|
||||
))));
|
||||
}
|
||||
GossipKind::BeaconBlock => {
|
||||
let beacon_block = SignedBeaconBlock::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block)));
|
||||
}
|
||||
GossipKind::VoluntaryExit => {
|
||||
let voluntary_exit = VoluntaryExit::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit)));
|
||||
}
|
||||
GossipKind::ProposerSlashing => {
|
||||
let proposer_slashing = ProposerSlashing::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::ProposerSlashing(Box::new(
|
||||
proposer_slashing,
|
||||
)));
|
||||
}
|
||||
GossipKind::AttesterSlashing => {
|
||||
let attester_slashing = AttesterSlashing::from_ssz_bytes(data)
|
||||
.map_err(|e| format!("{:?}", e))?;
|
||||
return Ok(PubsubMessage::AttesterSlashing(Box::new(
|
||||
attester_slashing,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -124,18 +142,32 @@ impl<T: EthSpec> PubsubMessage<T> {
|
||||
|
||||
/// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If
|
||||
/// no encoding is known, and error is returned.
|
||||
pub fn encode(&self, encoding: GossipEncoding) -> Vec<u8> {
|
||||
pub fn encode(&self, encoding: GossipEncoding) -> Result<Vec<u8>, String> {
|
||||
let data = match &self {
|
||||
PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(),
|
||||
};
|
||||
match encoding {
|
||||
GossipEncoding::SSZ => {
|
||||
// SSZ Encodings
|
||||
return match &self {
|
||||
PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(),
|
||||
PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(),
|
||||
};
|
||||
if data.len() > GOSSIP_MAX_SIZE {
|
||||
return Err("ssz encoded data > GOSSIP_MAX_SIZE".into());
|
||||
} else {
|
||||
Ok(data)
|
||||
}
|
||||
}
|
||||
GossipEncoding::SSZSnappy => {
|
||||
let mut encoder = Encoder::new();
|
||||
match encoder.compress_vec(&data) {
|
||||
Ok(compressed) if compressed.len() > GOSSIP_MAX_SIZE => {
|
||||
Err("ssz_snappy Encoded data > GOSSIP_MAX_SIZE".into())
|
||||
}
|
||||
Ok(compressed) => Ok(compressed),
|
||||
Err(e) => Err(format!("{}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ use types::SubnetId;
|
||||
// For example /eth2/beacon_block/ssz
|
||||
pub const TOPIC_PREFIX: &str = "eth2";
|
||||
pub const SSZ_ENCODING_POSTFIX: &str = "ssz";
|
||||
pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy";
|
||||
pub const BEACON_BLOCK_TOPIC: &str = "beacon_block";
|
||||
pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof";
|
||||
// for speed and easier string manipulation, committee topic index is split into a prefix and a
|
||||
@ -65,6 +66,14 @@ impl std::fmt::Display for GossipKind {
|
||||
pub enum GossipEncoding {
|
||||
/// Messages are encoded with SSZ.
|
||||
SSZ,
|
||||
/// Messages are encoded with SSZSnappy.
|
||||
SSZSnappy,
|
||||
}
|
||||
|
||||
impl Default for GossipEncoding {
|
||||
fn default() -> Self {
|
||||
GossipEncoding::SSZSnappy
|
||||
}
|
||||
}
|
||||
|
||||
impl GossipTopic {
|
||||
@ -109,6 +118,7 @@ impl GossipTopic {
|
||||
|
||||
let encoding = match topic_parts[4] {
|
||||
SSZ_ENCODING_POSTFIX => GossipEncoding::SSZ,
|
||||
SSZ_SNAPPY_ENCODING_POSTFIX => GossipEncoding::SSZSnappy,
|
||||
_ => return Err(format!("Unknown encoding: {}", topic)),
|
||||
};
|
||||
let kind = match topic_parts[3] {
|
||||
@ -144,6 +154,7 @@ impl Into<String> for GossipTopic {
|
||||
fn into(self) -> String {
|
||||
let encoding = match self.encoding {
|
||||
GossipEncoding::SSZ => SSZ_ENCODING_POSTFIX,
|
||||
GossipEncoding::SSZSnappy => SSZ_SNAPPY_ENCODING_POSTFIX,
|
||||
};
|
||||
|
||||
let kind = match self.kind {
|
||||
|
@ -35,7 +35,7 @@ fn test_gossipsub_forward() {
|
||||
};
|
||||
let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block));
|
||||
let publishing_topic: String = pubsub_message
|
||||
.topics(GossipEncoding::SSZ, [0, 0, 0, 0])
|
||||
.topics(GossipEncoding::default(), [0, 0, 0, 0])
|
||||
.first()
|
||||
.unwrap()
|
||||
.clone()
|
||||
@ -108,7 +108,7 @@ fn test_gossipsub_full_mesh_publish() {
|
||||
};
|
||||
let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block));
|
||||
let publishing_topic: String = pubsub_message
|
||||
.topics(GossipEncoding::SSZ, [0, 0, 0, 0])
|
||||
.topics(GossipEncoding::default(), [0, 0, 0, 0])
|
||||
.first()
|
||||
.unwrap()
|
||||
.clone()
|
||||
|
Loading…
Reference in New Issue
Block a user