Updates the message-id according to the Networking Spec (#1752)

## Proposed Changes

Implement the new message id function (see https://github.com/ethereum/eth2.0-specs/pull/2089) using an additional fast message id function for better performance + caching decompressed data.
This commit is contained in:
blacktemplar 2020-10-14 06:51:58 +00:00
parent 467de4c8d0
commit 8248afa793
11 changed files with 113 additions and 51 deletions

26
Cargo.lock generated
View File

@ -2917,7 +2917,7 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a"
[[package]]
name = "libp2p"
version = "0.29.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"atomic",
"bytes 0.5.6",
@ -2978,7 +2978,7 @@ dependencies = [
[[package]]
name = "libp2p-core"
version = "0.22.2"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"asn1_der",
"bs58",
@ -3011,7 +3011,7 @@ dependencies = [
[[package]]
name = "libp2p-core-derive"
version = "0.20.2"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"quote",
"syn",
@ -3020,7 +3020,7 @@ dependencies = [
[[package]]
name = "libp2p-dns"
version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"futures 0.3.6",
"libp2p-core 0.22.2",
@ -3030,7 +3030,7 @@ dependencies = [
[[package]]
name = "libp2p-gossipsub"
version = "0.22.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"base64 0.12.3",
"byteorder",
@ -3054,7 +3054,7 @@ dependencies = [
[[package]]
name = "libp2p-identify"
version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"futures 0.3.6",
"libp2p-core 0.22.2",
@ -3069,7 +3069,7 @@ dependencies = [
[[package]]
name = "libp2p-mplex"
version = "0.23.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"bytes 0.5.6",
"fnv",
@ -3084,7 +3084,7 @@ dependencies = [
[[package]]
name = "libp2p-noise"
version = "0.24.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"bytes 0.5.6",
"curve25519-dalek",
@ -3105,7 +3105,7 @@ dependencies = [
[[package]]
name = "libp2p-swarm"
version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"either",
"futures 0.3.6",
@ -3120,7 +3120,7 @@ dependencies = [
[[package]]
name = "libp2p-tcp"
version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"futures 0.3.6",
"futures-timer",
@ -3135,7 +3135,7 @@ dependencies = [
[[package]]
name = "libp2p-websocket"
version = "0.23.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"async-tls",
"either",
@ -3552,7 +3552,7 @@ dependencies = [
[[package]]
name = "multistream-select"
version = "0.8.3"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"bytes 0.5.6",
"futures 0.3.6",
@ -3851,7 +3851,7 @@ dependencies = [
[[package]]
name = "parity-multiaddr"
version = "0.9.3"
source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
dependencies = [
"arrayref",
"bs58",

View File

@ -42,7 +42,7 @@ regex = "1.3.9"
[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "5a9f0819af3990cfefad528e957297af596399b4"
rev = "fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
default-features = false
features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"]

View File

@ -1,8 +1,8 @@
use crate::behaviour::Gossipsub;
use crate::rpc::*;
use libp2p::{
core::either::{EitherError, EitherOutput},
core::upgrade::{EitherUpgrade, InboundUpgrade, OutboundUpgrade, SelectUpgrade, UpgradeError},
gossipsub::Gossipsub,
identify::Identify,
swarm::{
protocols_handler::{

View File

@ -1,3 +1,4 @@
use crate::behaviour::Gossipsub;
use crate::rpc::*;
use delegate::DelegatingHandler;
pub(super) use delegate::{
@ -5,7 +6,6 @@ pub(super) use delegate::{
};
use libp2p::{
core::upgrade::{InboundUpgrade, OutboundUpgrade},
gossipsub::Gossipsub,
identify::Identify,
swarm::protocols_handler::{
KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,

View File

@ -1,7 +1,7 @@
use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent};
use crate::rpc::*;
use crate::service::METADATA_FILENAME;
use crate::types::{GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery};
use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery};
use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
@ -13,8 +13,8 @@ use libp2p::{
Multiaddr,
},
gossipsub::{
Gossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity,
MessageId,
GenericGossipsub, GenericGossipsubEvent, IdentTopic as Topic, MessageAcceptance,
MessageAuthenticity, MessageId,
},
identify::{Identify, IdentifyEvent},
swarm::{
@ -43,6 +43,9 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10;
/// Identifier of requests sent by a peer.
pub type PeerRequestId = (ConnectionId, SubstreamId);
pub type Gossipsub = GenericGossipsub<MessageData>;
pub type GossipsubEvent = GenericGossipsubEvent<MessageData>;
/// The types of events than can be obtained from polling the behaviour.
#[derive(Debug)]
pub enum BehaviourEvent<TSpec: EthSpec> {
@ -518,7 +521,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
} => {
// Note: We are keeping track here of the peer that sent us the message, not the
// peer that originally published the message.
match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) {
match PubsubMessage::decode(&gs_msg.topics, gs_msg.data()) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => e);
//reject the message

View File

@ -1,11 +1,12 @@
use crate::types::GossipKind;
use crate::types::{GossipKind, MessageData};
use crate::{Enr, PeerIdSerialized};
use directory::{
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_TESTNET, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
};
use discv5::{Discv5Config, Discv5ConfigBuilder};
use libp2p::gossipsub::{
GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId, ValidationMode,
FastMessageId, GenericGossipsubConfig, GenericGossipsubConfigBuilder, GenericGossipsubMessage,
MessageId, RawGossipsubMessage, ValidationMode,
};
use libp2p::Multiaddr;
use serde_derive::{Deserialize, Serialize};
@ -14,6 +15,12 @@ use std::path::PathBuf;
use std::time::Duration;
pub const GOSSIP_MAX_SIZE: usize = 1_048_576;
const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0, 0, 0, 0];
const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [1, 0, 0, 0];
pub type GossipsubConfig = GenericGossipsubConfig<MessageData>;
pub type GossipsubConfigBuilder = GenericGossipsubConfigBuilder<MessageData>;
pub type GossipsubMessage = GenericGossipsubMessage<MessageData>;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
@ -91,8 +98,30 @@ impl Default for Config {
// The function used to generate a gossipsub message id
// We use the first 8 bytes of SHA256(data) for content addressing
let gossip_message_id =
|message: &GossipsubMessage| MessageId::from(&Sha256::digest(&message.data)[..]);
let fast_gossip_message_id =
|message: &RawGossipsubMessage| FastMessageId::from(&Sha256::digest(&message.data)[..]);
fn prefix(prefix: [u8; 4], data: &[u8]) -> Vec<u8> {
prefix
.to_vec()
.into_iter()
.chain(data.iter().cloned())
.collect()
}
let gossip_message_id = |message: &GossipsubMessage| {
MessageId::from(
&Sha256::digest(
{
match &message.data.decompressed {
Ok(decompressed) => prefix(MESSAGE_DOMAIN_VALID_SNAPPY, decompressed),
_ => prefix(MESSAGE_DOMAIN_INVALID_SNAPPY, &message.data.raw),
}
}
.as_slice(),
)[..20],
)
};
// gossipsub configuration
// Note: The topics by default are sent as plain strings. Hashes are an optional
@ -112,6 +141,7 @@ impl Default for Config {
// prevent duplicates for 550 heartbeats(700millis * 550) = 385 secs
.duplicate_cache_time(Duration::from_secs(385))
.message_id_fn(gossip_message_id)
.fast_message_id_fn(fast_gossip_message_id)
.build()
.expect("valid gossipsub configuration");

View File

@ -59,11 +59,12 @@ impl<'de> Deserialize<'de> for PeerIdSerialized {
}
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery};
pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response};
pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig;
pub use config::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage};
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
pub use discv5;
pub use libp2p::gossipsub::{Gossipsub, MessageAcceptance, MessageId, Topic, TopicHash};
pub use libp2p::gossipsub::{MessageAcceptance, MessageId, Topic, TopicHash};
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
pub use libp2p::{multiaddr, Multiaddr};
pub use metrics::scrape_discovery_metrics;

View File

@ -13,7 +13,7 @@ pub type EnrBitfield<T: EthSpec> = BitVector<T::SubnetBitfieldLength>;
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals;
pub use pubsub::PubsubMessage;
pub use pubsub::{MessageData, PubsubMessage};
pub use subnet::SubnetDiscovery;
pub use sync_state::SyncState;
pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS};

View File

@ -12,6 +12,33 @@ use types::{
SignedBeaconBlock, SignedVoluntaryExit,
};
#[derive(Clone)]
pub struct MessageData {
pub raw: Vec<u8>,
pub decompressed: Result<Vec<u8>, String>,
}
impl AsRef<[u8]> for MessageData {
fn as_ref(&self) -> &[u8] {
self.raw.as_ref()
}
}
impl Into<Vec<u8>> for MessageData {
fn into(self) -> Vec<u8> {
self.raw
}
}
impl From<Vec<u8>> for MessageData {
fn from(raw: Vec<u8>) -> Self {
Self {
decompressed: decompress_snappy(raw.as_ref()),
raw,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum PubsubMessage<T: EthSpec> {
/// Gossipsub message providing notification of a new block.
@ -28,6 +55,24 @@ pub enum PubsubMessage<T: EthSpec> {
AttesterSlashing(Box<AttesterSlashing<T>>),
}
fn decompress_snappy(data: &[u8]) -> Result<Vec<u8>, String> {
// Exit early if uncompressed data is > GOSSIP_MAX_SIZE
match decompress_len(data) {
Ok(n) if n > GOSSIP_MAX_SIZE => {
return Err("ssz_snappy decoded data > GOSSIP_MAX_SIZE".into());
}
Ok(_) => {}
Err(e) => {
return Err(format!("{}", e));
}
};
let mut decoder = Decoder::new();
match decoder.decompress_vec(data) {
Ok(decompressed_data) => Ok(decompressed_data),
Err(e) => Err(format!("{}", e)),
}
}
impl<T: EthSpec> PubsubMessage<T> {
/// Returns the topics that each pubsub message will be sent across, given a supported
/// gossipsub encoding and fork version.
@ -59,7 +104,7 @@ impl<T: EthSpec> PubsubMessage<T> {
* Also note that a message can be associated with many topics. As soon as one of the topics is
* known we match. If none of the topics are known we return an unknown state.
*/
pub fn decode(topics: &[TopicHash], data: &[u8]) -> Result<Self, String> {
pub fn decode(topics: &[TopicHash], data: &MessageData) -> Result<Self, String> {
let mut unknown_topics = Vec::new();
for topic in topics {
match GossipTopic::decode(topic.as_str()) {
@ -68,25 +113,9 @@ impl<T: EthSpec> PubsubMessage<T> {
continue;
}
Ok(gossip_topic) => {
let decompressed_data = &(match gossip_topic.encoding() {
GossipEncoding::SSZSnappy => {
// Exit early if uncompressed data is > GOSSIP_MAX_SIZE
match decompress_len(data) {
Ok(n) if n > GOSSIP_MAX_SIZE => {
return Err("ssz_snappy decoded data > GOSSIP_MAX_SIZE".into());
}
Ok(_) => {}
Err(e) => {
return Err(format!("{}", e));
}
let decompressed_data = match gossip_topic.encoding() {
GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(),
};
let mut decoder = Decoder::new();
match decoder.decompress_vec(data) {
Ok(decompressed_data) => decompressed_data,
Err(e) => return Err(format!("{}", e)),
}
}
});
// the ssz decoders
match gossip_topic.kind() {
GossipKind::BeaconAggregateAndProof => {

View File

@ -3,14 +3,13 @@ use eth2_libp2p::Enr;
use eth2_libp2p::EnrExt;
use eth2_libp2p::Multiaddr;
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{Libp2pEvent, NetworkConfig};
use eth2_libp2p::{GossipsubConfigBuilder, Libp2pEvent, NetworkConfig};
use slog::{debug, error, o, Drain};
use std::net::{TcpListener, UdpSocket};
use std::time::Duration;
use types::{EnrForkId, MinimalEthSpec};
type E = MinimalEthSpec;
use libp2p::gossipsub::GossipsubConfigBuilder;
use tempdir::TempDir;
pub struct Libp2pInstance(LibP2PService<E>, exit_future::Signal);

View File

@ -8,7 +8,7 @@ use crate::{error, metrics};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response,
Gossipsub, Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response,
};
use eth2_libp2p::{
types::GossipKind, BehaviourEvent, GossipTopic, MessageId, NetworkGlobals, PeerId, TopicHash,
@ -537,7 +537,7 @@ fn expose_receive_metrics<T: EthSpec>(message: &PubsubMessage<T>) {
}
}
fn update_gossip_metrics<T: EthSpec>(gossipsub: &eth2_libp2p::Gossipsub) {
fn update_gossip_metrics<T: EthSpec>(gossipsub: &Gossipsub) {
// Clear the metrics
let _ = metrics::PEERS_PER_PROTOCOL
.as_ref()