Gossipsub topic filters (#1767)

## Proposed Changes

Adds a gossipsub topic filter that only allows subscribing and incoming subscriptions from valid ETH2 topics.

## Additional Info

Currently the preparation of the valid topic hashes uses only the current fork id but in the future it must also use all possible future fork ids for planned forks. This has to get added when hard coded forks get implemented.

DO NOT MERGE: We first need to merge the libp2p changes (see https://github.com/sigp/rust-libp2p/pull/70) so that we can refer from here to a commit hash inside the lighthouse branch.
This commit is contained in:
blacktemplar 2020-10-14 10:12:57 +00:00
parent 8248afa793
commit a0634cc64f
3 changed files with 60 additions and 17 deletions

26
Cargo.lock generated
View File

@ -2917,7 +2917,7 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a"
[[package]] [[package]]
name = "libp2p" name = "libp2p"
version = "0.29.0" version = "0.29.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"atomic", "atomic",
"bytes 0.5.6", "bytes 0.5.6",
@ -2978,7 +2978,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-core" name = "libp2p-core"
version = "0.22.2" version = "0.22.2"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"asn1_der", "asn1_der",
"bs58", "bs58",
@ -3011,7 +3011,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-core-derive" name = "libp2p-core-derive"
version = "0.20.2" version = "0.20.2"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"quote", "quote",
"syn", "syn",
@ -3020,7 +3020,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-dns" name = "libp2p-dns"
version = "0.22.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"futures 0.3.6", "futures 0.3.6",
"libp2p-core 0.22.2", "libp2p-core 0.22.2",
@ -3030,7 +3030,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-gossipsub" name = "libp2p-gossipsub"
version = "0.22.1" version = "0.22.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"base64 0.12.3", "base64 0.12.3",
"byteorder", "byteorder",
@ -3054,7 +3054,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-identify" name = "libp2p-identify"
version = "0.22.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"futures 0.3.6", "futures 0.3.6",
"libp2p-core 0.22.2", "libp2p-core 0.22.2",
@ -3069,7 +3069,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-mplex" name = "libp2p-mplex"
version = "0.23.0" version = "0.23.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"fnv", "fnv",
@ -3084,7 +3084,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-noise" name = "libp2p-noise"
version = "0.24.1" version = "0.24.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"curve25519-dalek", "curve25519-dalek",
@ -3105,7 +3105,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-swarm" name = "libp2p-swarm"
version = "0.22.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"either", "either",
"futures 0.3.6", "futures 0.3.6",
@ -3120,7 +3120,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-tcp" name = "libp2p-tcp"
version = "0.22.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"futures 0.3.6", "futures 0.3.6",
"futures-timer", "futures-timer",
@ -3135,7 +3135,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-websocket" name = "libp2p-websocket"
version = "0.23.1" version = "0.23.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"async-tls", "async-tls",
"either", "either",
@ -3552,7 +3552,7 @@ dependencies = [
[[package]] [[package]]
name = "multistream-select" name = "multistream-select"
version = "0.8.3" version = "0.8.3"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"futures 0.3.6", "futures 0.3.6",
@ -3851,7 +3851,7 @@ dependencies = [
[[package]] [[package]]
name = "parity-multiaddr" name = "parity-multiaddr"
version = "0.9.3" version = "0.9.3"
source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"arrayref", "arrayref",
"bs58", "bs58",

View File

@ -42,7 +42,7 @@ regex = "1.3.9"
[dependencies.libp2p] [dependencies.libp2p]
#version = "0.23.0" #version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p" git = "https://github.com/sigp/rust-libp2p"
rev = "fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" rev = "a731aa803d986977c25a77ed2b002d9578f7377c"
default-features = false default-features = false
features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"] features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"]

View File

@ -6,6 +6,9 @@ use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*; use futures::prelude::*;
use handler::{BehaviourHandler, BehaviourHandlerIn, DelegateIn, DelegateOut}; use handler::{BehaviourHandler, BehaviourHandlerIn, DelegateIn, DelegateOut};
use libp2p::gossipsub::subscription_filter::{
MaxCountSubscriptionFilter, WhitelistSubscriptionFilter,
};
use libp2p::{ use libp2p::{
core::{ core::{
connection::{ConnectedPoint, ConnectionId, ListenerId}, connection::{ConnectedPoint, ConnectionId, ListenerId},
@ -25,6 +28,7 @@ use libp2p::{
}; };
use slog::{crit, debug, o, trace, warn}; use slog::{crit, debug, o, trace, warn};
use ssz::Encode; use ssz::Encode;
use std::collections::HashSet;
use std::fs::File; use std::fs::File;
use std::io::Write; use std::io::Write;
use std::path::PathBuf; use std::path::PathBuf;
@ -43,7 +47,8 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10;
/// Identifier of requests sent by a peer. /// Identifier of requests sent by a peer.
pub type PeerRequestId = (ConnectionId, SubstreamId); pub type PeerRequestId = (ConnectionId, SubstreamId);
pub type Gossipsub = GenericGossipsub<MessageData>; pub type SubscriptionFilter = MaxCountSubscriptionFilter<WhitelistSubscriptionFilter>;
pub type Gossipsub = GenericGossipsub<MessageData, SubscriptionFilter>;
pub type GossipsubEvent = GenericGossipsubEvent<MessageData>; pub type GossipsubEvent = GenericGossipsubEvent<MessageData>;
/// The types of events than can be obtained from polling the behaviour. /// The types of events than can be obtained from polling the behaviour.
@ -149,7 +154,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.eth2() .eth2()
.expect("Local ENR must have a fork id"); .expect("Local ENR must have a fork id");
let gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, net_conf.gs_config.clone()) let possible_fork_digests = vec![enr_fork_id.fork_digest];
let filter = MaxCountSubscriptionFilter {
filter: Self::create_whitelist_filter(possible_fork_digests, 64), //TODO change this to a constant
max_subscribed_topics: 200, //TODO change this to a constant
max_subscriptions_per_request: 100, //this is according to the current go implementation
};
let gossipsub = Gossipsub::new_with_subscription_filter(
MessageAuthenticity::Anonymous,
net_conf.gs_config.clone(),
filter,
)
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?; .map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;
// Temporarily disable scoring until parameters are tested. // Temporarily disable scoring until parameters are tested.
@ -785,6 +801,33 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
waker.wake_by_ref(); waker.wake_by_ref();
} }
} }
/// Creates a whitelist topic filter that covers all possible topics using the given set of
/// possible fork digests.
fn create_whitelist_filter(
possible_fork_digests: Vec<[u8; 4]>,
attestation_subnet_count: u64,
) -> WhitelistSubscriptionFilter {
let mut possible_hashes = HashSet::new();
for fork_digest in possible_fork_digests {
let mut add = |kind| {
let topic: Topic =
GossipTopic::new(kind, GossipEncoding::SSZSnappy, fork_digest).into();
possible_hashes.insert(topic.hash());
};
use GossipKind::*;
add(BeaconBlock);
add(BeaconAggregateAndProof);
add(VoluntaryExit);
add(ProposerSlashing);
add(AttesterSlashing);
for id in 0..attestation_subnet_count {
add(Attestation(SubnetId::new(id)));
}
}
WhitelistSubscriptionFilter(possible_hashes)
}
} }
/// Calls the given function with the given args on all sub behaviours. /// Calls the given function with the given args on all sub behaviours.