Downgrade libp2p (#1817)

## Description

This downgrades the recent libp2p upgrade. 

There were issues with the RPC which prevented syncing of the chain and this upgrade needs to be further investigated.
This commit is contained in:
Age Manning 2020-10-23 09:33:59 +00:00
parent fa2daa7d6c
commit 7870b81ade
8 changed files with 182 additions and 152 deletions

157
Cargo.lock generated
View File

@ -732,6 +732,12 @@ dependencies = [
"pkg-config", "pkg-config",
] ]
[[package]]
name = "c_linked_list"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4964518bd3b4a8190e832886cdc0da9794f12e8e6c1613a9e90ff331c4c8724b"
[[package]] [[package]]
name = "cached_tree_hash" name = "cached_tree_hash"
version = "0.1.0" version = "0.1.0"
@ -1402,7 +1408,7 @@ dependencies = [
[[package]] [[package]]
name = "discv5" name = "discv5"
version = "0.1.0-beta.1" version = "0.1.0-beta.1"
source = "git+https://github.com/sigp/discv5?rev=3ab46fbec53a0bb9da53340e10b9daa1a3effc9f#3ab46fbec53a0bb9da53340e10b9daa1a3effc9f" source = "git+https://github.com/sigp/discv5?rev=fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0#fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0"
dependencies = [ dependencies = [
"aes-ctr", "aes-ctr",
"aes-gcm", "aes-gcm",
@ -1731,7 +1737,7 @@ dependencies = [
"base64 0.13.0", "base64 0.13.0",
"directory", "directory",
"dirs 3.0.1", "dirs 3.0.1",
"discv5 0.1.0-beta.1 (git+https://github.com/sigp/discv5?rev=3ab46fbec53a0bb9da53340e10b9daa1a3effc9f)", "discv5 0.1.0-beta.1 (git+https://github.com/sigp/discv5?rev=fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0)",
"error-chain", "error-chain",
"eth2_ssz", "eth2_ssz",
"eth2_ssz_derive", "eth2_ssz_derive",
@ -2167,6 +2173,12 @@ dependencies = [
"pin-project 0.4.27", "pin-project 0.4.27",
] ]
[[package]]
name = "gcc"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
[[package]] [[package]]
name = "generator" name = "generator"
version = "0.6.23" version = "0.6.23"
@ -2223,6 +2235,28 @@ dependencies = [
"types", "types",
] ]
[[package]]
name = "get_if_addrs"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abddb55a898d32925f3148bd281174a68eeb68bbfd9a5938a57b18f506ee4ef7"
dependencies = [
"c_linked_list",
"get_if_addrs-sys",
"libc",
"winapi 0.2.8",
]
[[package]]
name = "get_if_addrs-sys"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d04f9fb746cf36b191c00f3ede8bde9c8e64f9f4b05ae2694a9ccf5e3f5ab48"
dependencies = [
"gcc",
"libc",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.1.15" version = "0.1.15"
@ -2719,27 +2753,6 @@ dependencies = [
"unicode-normalization", "unicode-normalization",
] ]
[[package]]
name = "if-addrs"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f12906406f12abf5569643c46b29aec78313dc1537b17dd5c5250169790c4db9"
dependencies = [
"if-addrs-sys",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "if-addrs-sys"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e2556f16544202bcfe0aa5d20a01a6b815f736b136b3ad76dc547ee6b5bb1df"
dependencies = [
"cc",
"libc",
]
[[package]] [[package]]
name = "igd" name = "igd"
version = "0.11.1" version = "0.11.1"
@ -3023,14 +3036,14 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a"
[[package]] [[package]]
name = "libp2p" name = "libp2p"
version = "0.29.1" version = "0.29.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"atomic", "atomic",
"bytes 0.5.6", "bytes 0.5.6",
"futures 0.3.6", "futures 0.3.6",
"lazy_static", "lazy_static",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"libp2p-core-derive", "libp2p-core-derive",
"libp2p-dns", "libp2p-dns",
"libp2p-gossipsub", "libp2p-gossipsub",
@ -3041,9 +3054,9 @@ dependencies = [
"libp2p-tcp", "libp2p-tcp",
"libp2p-websocket", "libp2p-websocket",
"multihash", "multihash",
"parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80)", "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c)",
"parking_lot 0.11.0", "parking_lot 0.11.0",
"pin-project 1.0.1", "pin-project 0.4.27",
"smallvec 1.4.2", "smallvec 1.4.2",
"wasm-timer", "wasm-timer",
] ]
@ -3065,7 +3078,7 @@ dependencies = [
"libsecp256k1", "libsecp256k1",
"log 0.4.11", "log 0.4.11",
"multihash", "multihash",
"multistream-select 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "multistream-select 0.8.4",
"parity-multiaddr 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", "parity-multiaddr 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.10.2", "parking_lot 0.10.2",
"pin-project 0.4.27", "pin-project 0.4.27",
@ -3084,8 +3097,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-core" name = "libp2p-core"
version = "0.23.1" version = "0.22.2"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"asn1_der", "asn1_der",
"bs58", "bs58",
@ -3098,10 +3111,10 @@ dependencies = [
"libsecp256k1", "libsecp256k1",
"log 0.4.11", "log 0.4.11",
"multihash", "multihash",
"multistream-select 0.8.4 (git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80)", "multistream-select 0.8.3",
"parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80)", "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c)",
"parking_lot 0.11.0", "parking_lot 0.11.0",
"pin-project 1.0.1", "pin-project 0.4.27",
"prost", "prost",
"prost-build", "prost-build",
"rand 0.7.3", "rand 0.7.3",
@ -3118,7 +3131,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-core-derive" name = "libp2p-core-derive"
version = "0.20.2" version = "0.20.2"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"quote", "quote",
"syn", "syn",
@ -3126,27 +3139,27 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-dns" name = "libp2p-dns"
version = "0.23.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"futures 0.3.6", "futures 0.3.6",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"log 0.4.11", "log 0.4.11",
] ]
[[package]] [[package]]
name = "libp2p-gossipsub" name = "libp2p-gossipsub"
version = "0.23.0" version = "0.22.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"base64 0.13.0", "base64 0.12.3",
"byteorder", "byteorder",
"bytes 0.5.6", "bytes 0.5.6",
"fnv", "fnv",
"futures 0.3.6", "futures 0.3.6",
"futures_codec", "futures_codec",
"hex_fmt", "hex_fmt",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"libp2p-swarm", "libp2p-swarm",
"log 0.4.11", "log 0.4.11",
"prost", "prost",
@ -3160,11 +3173,11 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-identify" name = "libp2p-identify"
version = "0.23.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"futures 0.3.6", "futures 0.3.6",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"libp2p-swarm", "libp2p-swarm",
"log 0.4.11", "log 0.4.11",
"prost", "prost",
@ -3176,30 +3189,28 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-mplex" name = "libp2p-mplex"
version = "0.23.0" version = "0.23.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"fnv",
"futures 0.3.6", "futures 0.3.6",
"futures_codec", "futures_codec",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"log 0.4.11", "log 0.4.11",
"nohash-hasher",
"parking_lot 0.11.0", "parking_lot 0.11.0",
"rand 0.7.3",
"smallvec 1.4.2",
"unsigned-varint 0.5.1", "unsigned-varint 0.5.1",
] ]
[[package]] [[package]]
name = "libp2p-noise" name = "libp2p-noise"
version = "0.25.0" version = "0.24.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"curve25519-dalek", "curve25519-dalek",
"futures 0.3.6", "futures 0.3.6",
"lazy_static", "lazy_static",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"log 0.4.11", "log 0.4.11",
"prost", "prost",
"prost-build", "prost-build",
@ -3213,12 +3224,12 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-swarm" name = "libp2p-swarm"
version = "0.23.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"either", "either",
"futures 0.3.6", "futures 0.3.6",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"log 0.4.11", "log 0.4.11",
"rand 0.7.3", "rand 0.7.3",
"smallvec 1.4.2", "smallvec 1.4.2",
@ -3228,14 +3239,14 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-tcp" name = "libp2p-tcp"
version = "0.23.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"futures 0.3.6", "futures 0.3.6",
"futures-timer", "futures-timer",
"if-addrs", "get_if_addrs",
"ipnet", "ipnet",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"log 0.4.11", "log 0.4.11",
"socket2", "socket2",
"tokio 0.2.22", "tokio 0.2.22",
@ -3243,13 +3254,13 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-websocket" name = "libp2p-websocket"
version = "0.24.0" version = "0.23.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"async-tls", "async-tls",
"either", "either",
"futures 0.3.6", "futures 0.3.6",
"libp2p-core 0.23.1", "libp2p-core 0.22.2",
"log 0.4.11", "log 0.4.11",
"quicksink", "quicksink",
"rustls", "rustls",
@ -3668,14 +3679,13 @@ dependencies = [
[[package]] [[package]]
name = "multistream-select" name = "multistream-select"
version = "0.8.4" version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
checksum = "36a6aa6e32fbaf16795142335967214b8564a7a4661eb6dc846ef343a6e00ac1"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"futures 0.3.6", "futures 0.3.6",
"log 0.4.11", "log 0.4.11",
"pin-project 1.0.1", "pin-project 0.4.27",
"smallvec 1.4.2", "smallvec 1.4.2",
"unsigned-varint 0.5.1", "unsigned-varint 0.5.1",
] ]
@ -3683,7 +3693,8 @@ dependencies = [
[[package]] [[package]]
name = "multistream-select" name = "multistream-select"
version = "0.8.4" version = "0.8.4"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36a6aa6e32fbaf16795142335967214b8564a7a4661eb6dc846ef343a6e00ac1"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"futures 0.3.6", "futures 0.3.6",
@ -3735,9 +3746,9 @@ dependencies = [
"fnv", "fnv",
"futures 0.3.6", "futures 0.3.6",
"genesis", "genesis",
"get_if_addrs",
"hashset_delay", "hashset_delay",
"hex 0.4.2", "hex 0.4.2",
"if-addrs",
"igd", "igd",
"itertools 0.9.0", "itertools 0.9.0",
"lazy_static", "lazy_static",
@ -3805,12 +3816,6 @@ dependencies = [
"validator_dir", "validator_dir",
] ]
[[package]]
name = "nohash-hasher"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451"
[[package]] [[package]]
name = "nom" name = "nom"
version = "2.2.1" version = "2.2.1"
@ -4000,7 +4005,7 @@ dependencies = [
[[package]] [[package]]
name = "parity-multiaddr" name = "parity-multiaddr"
version = "0.9.3" version = "0.9.3"
source = "git+https://github.com/sigp/rust-libp2p?rev=e42d105cd776b50fc8ef5d726a9e2c799196bd80#e42d105cd776b50fc8ef5d726a9e2c799196bd80" source = "git+https://github.com/sigp/rust-libp2p?rev=a731aa803d986977c25a77ed2b002d9578f7377c#a731aa803d986977c25a77ed2b002d9578f7377c"
dependencies = [ dependencies = [
"arrayref", "arrayref",
"bs58", "bs58",

View File

@ -5,7 +5,7 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
discv5 = { git = "https://github.com/sigp/discv5", rev = "3ab46fbec53a0bb9da53340e10b9daa1a3effc9f", features = ["libp2p"] } discv5 = { git = "https://github.com/sigp/discv5", rev = "fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0", features = ["libp2p"] }
types = { path = "../../consensus/types" } types = { path = "../../consensus/types" }
hashset_delay = { path = "../../common/hashset_delay" } hashset_delay = { path = "../../common/hashset_delay" }
eth2_ssz_types = { path = "../../consensus/ssz_types" } eth2_ssz_types = { path = "../../consensus/ssz_types" }
@ -42,7 +42,7 @@ regex = "1.3.9"
[dependencies.libp2p] [dependencies.libp2p]
#version = "0.23.0" #version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p" git = "https://github.com/sigp/rust-libp2p"
rev = "e42d105cd776b50fc8ef5d726a9e2c799196bd80" rev = "a731aa803d986977c25a77ed2b002d9578f7377c"
default-features = false default-features = false
features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"] features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"]

View File

@ -90,6 +90,8 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
id: MessageId, id: MessageId,
/// The peer from which we received this message, not the peer that published it. /// The peer from which we received this message, not the peer that published it.
source: PeerId, source: PeerId,
/// The topics that this message was sent on.
topics: Vec<TopicHash>,
/// The message itself. /// The message itself.
message: PubsubMessage<TSpec>, message: PubsubMessage<TSpec>,
}, },
@ -302,34 +304,35 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding. /// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) { pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) {
for message in messages { for message in messages {
let topic = message.topic(GossipEncoding::default(), self.enr_fork_id.fork_digest); for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
match message.encode(GossipEncoding::default()) { match message.encode(GossipEncoding::default()) {
Ok(message_data) => { Ok(message_data) => {
if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) { if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) {
slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e)); slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e));
// add to metrics // add to metrics
match topic.kind() { match topic.kind() {
GossipKind::Attestation(subnet_id) => { GossipKind::Attestation(subnet_id) => {
if let Some(v) = metrics::get_int_gauge( if let Some(v) = metrics::get_int_gauge(
&metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET, &metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET,
&[&subnet_id.to_string()], &[&subnet_id.to_string()],
) { ) {
v.inc() v.inc()
}; };
} }
kind => { kind => {
if let Some(v) = metrics::get_int_gauge( if let Some(v) = metrics::get_int_gauge(
&metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC, &metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC,
&[&format!("{:?}", kind)], &[&format!("{:?}", kind)],
) { ) {
v.inc() v.inc()
}; };
}
} }
} }
} }
Err(e) => crit!(self.log, "Could not publish message"; "error" => e),
} }
Err(e) => crit!(self.log, "Could not publish message"; "error" => e),
} }
} }
} }
@ -534,7 +537,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
} => { } => {
// Note: We are keeping track here of the peer that sent us the message, not the // Note: We are keeping track here of the peer that sent us the message, not the
// peer that originally published the message. // peer that originally published the message.
match PubsubMessage::decode(&gs_msg.topic, gs_msg.data()) { match PubsubMessage::decode(&gs_msg.topics, gs_msg.data()) {
Err(e) => { Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => e); debug!(self.log, "Could not decode gossipsub message"; "error" => e);
//reject the message //reject the message
@ -551,6 +554,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
self.add_event(BehaviourEvent::PubsubMessage { self.add_event(BehaviourEvent::PubsubMessage {
id, id,
source: propagation_source, source: propagation_source,
topics: gs_msg.topics,
message: msg, message: msg,
}); });
} }

View File

@ -9,7 +9,7 @@ use crate::EnrExt;
use crate::{NetworkConfig, NetworkGlobals, PeerAction}; use crate::{NetworkConfig, NetworkGlobals, PeerAction};
use futures::prelude::*; use futures::prelude::*;
use libp2p::core::{ use libp2p::core::{
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed, identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::boxed::Boxed,
}; };
use libp2p::{ use libp2p::{
core, noise, core, noise,
@ -20,6 +20,7 @@ use slog::{crit, debug, info, o, trace, warn};
use ssz::Decode; use ssz::Decode;
use std::fs::File; use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -322,7 +323,9 @@ impl<TSpec: EthSpec> Service<TSpec> {
/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, and /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, and
/// mplex as the multiplexing layer. /// mplex as the multiplexing layer.
fn build_transport(local_private_key: Keypair) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>> { fn build_transport(
local_private_key: Keypair,
) -> Result<Boxed<(PeerId, StreamMuxerBox), Error>, Error> {
let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true); let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport)?; let transport = libp2p::dns::DnsConfig::new(transport)?;
#[cfg(feature = "libp2p-websocket")] #[cfg(feature = "libp2p-websocket")]
@ -335,7 +338,10 @@ fn build_transport(local_private_key: Keypair) -> std::io::Result<Boxed<(PeerId,
.upgrade(core::upgrade::Version::V1) .upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key)) .authenticate(generate_noise_config(&local_private_key))
.multiplex(libp2p::mplex::MplexConfig::new()) .multiplex(libp2p::mplex::MplexConfig::new())
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(10)) .timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed()) .boxed())
} }

View File

@ -74,10 +74,10 @@ fn decompress_snappy(data: &[u8]) -> Result<Vec<u8>, String> {
} }
impl<T: EthSpec> PubsubMessage<T> { impl<T: EthSpec> PubsubMessage<T> {
/// Returns the topic that each pubsub message will be sent across, given a supported /// Returns the topics that each pubsub message will be sent across, given a supported
/// gossipsub encoding and fork version. /// gossipsub encoding and fork version.
pub fn topic(&self, encoding: GossipEncoding, fork_version: [u8; 4]) -> GossipTopic { pub fn topics(&self, encoding: GossipEncoding, fork_version: [u8; 4]) -> Vec<GossipTopic> {
GossipTopic::new(self.kind(), encoding, fork_version) vec![GossipTopic::new(self.kind(), encoding, fork_version)]
} }
/// Returns the kind of gossipsub topic associated with the message. /// Returns the kind of gossipsub topic associated with the message.
@ -104,54 +104,68 @@ impl<T: EthSpec> PubsubMessage<T> {
* Also note that a message can be associated with many topics. As soon as one of the topics is * Also note that a message can be associated with many topics. As soon as one of the topics is
* known we match. If none of the topics are known we return an unknown state. * known we match. If none of the topics are known we return an unknown state.
*/ */
pub fn decode(topic: &TopicHash, data: &MessageData) -> Result<Self, String> { pub fn decode(topics: &[TopicHash], data: &MessageData) -> Result<Self, String> {
match GossipTopic::decode(topic.as_str()) { let mut unknown_topics = Vec::new();
Err(_) => Err(format!("Unknown topic: {}", topic)), for topic in topics {
Ok(gossip_topic) => { match GossipTopic::decode(topic.as_str()) {
let decompressed_data = match gossip_topic.encoding() { Err(_) => {
GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(), unknown_topics.push(topic);
}; continue;
// the ssz decoders }
match gossip_topic.kind() { Ok(gossip_topic) => {
GossipKind::BeaconAggregateAndProof => { let decompressed_data = match gossip_topic.encoding() {
let agg_and_proof = GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(),
SignedAggregateAndProof::from_ssz_bytes(decompressed_data) };
// the ssz decoders
match gossip_topic.kind() {
GossipKind::BeaconAggregateAndProof => {
let agg_and_proof =
SignedAggregateAndProof::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::AggregateAndProofAttestation(Box::new(
agg_and_proof,
)));
}
GossipKind::Attestation(subnet_id) => {
let attestation = Attestation::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::AggregateAndProofAttestation(Box::new( return Ok(PubsubMessage::Attestation(Box::new((
agg_and_proof, *subnet_id,
))) attestation,
} ))));
GossipKind::Attestation(subnet_id) => { }
let attestation = Attestation::from_ssz_bytes(decompressed_data) GossipKind::BeaconBlock => {
.map_err(|e| format!("{:?}", e))?; let beacon_block = SignedBeaconBlock::from_ssz_bytes(decompressed_data)
Ok(PubsubMessage::Attestation(Box::new(( .map_err(|e| format!("{:?}", e))?;
*subnet_id, return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block)));
attestation, }
)))) GossipKind::VoluntaryExit => {
} let voluntary_exit =
GossipKind::BeaconBlock => { SignedVoluntaryExit::from_ssz_bytes(decompressed_data)
let beacon_block = SignedBeaconBlock::from_ssz_bytes(decompressed_data) .map_err(|e| format!("{:?}", e))?;
.map_err(|e| format!("{:?}", e))?; return Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit)));
Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))) }
} GossipKind::ProposerSlashing => {
GossipKind::VoluntaryExit => { let proposer_slashing =
let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(decompressed_data) ProposerSlashing::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))) return Ok(PubsubMessage::ProposerSlashing(Box::new(
} proposer_slashing,
GossipKind::ProposerSlashing => { )));
let proposer_slashing = ProposerSlashing::from_ssz_bytes(decompressed_data) }
.map_err(|e| format!("{:?}", e))?; GossipKind::AttesterSlashing => {
Ok(PubsubMessage::ProposerSlashing(Box::new(proposer_slashing))) let attester_slashing =
} AttesterSlashing::from_ssz_bytes(decompressed_data)
GossipKind::AttesterSlashing => { .map_err(|e| format!("{:?}", e))?;
let attester_slashing = AttesterSlashing::from_ssz_bytes(decompressed_data) return Ok(PubsubMessage::AttesterSlashing(Box::new(
.map_err(|e| format!("{:?}", e))?; attester_slashing,
Ok(PubsubMessage::AttesterSlashing(Box::new(attester_slashing))) )));
}
} }
} }
} }
} }
Err(format!("Unknown gossipsub topics: {:?}", unknown_topics))
} }
/// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If /// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If

View File

@ -40,4 +40,4 @@ igd = "0.11.1"
itertools = "0.9.0" itertools = "0.9.0"
num_cpus = "1.13.0" num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" } lru_cache = { path = "../../common/lru_cache" }
if-addrs = "0.6.4" get_if_addrs = "0.5.3"

View File

@ -4,7 +4,7 @@
//! - UPnP //! - UPnP
use crate::{NetworkConfig, NetworkMessage}; use crate::{NetworkConfig, NetworkMessage};
use if_addrs::get_if_addrs; use get_if_addrs::get_if_addrs;
use slog::{debug, info, warn}; use slog::{debug, info, warn};
use std::net::{IpAddr, SocketAddr, SocketAddrV4}; use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc; use tokio::sync::mpsc;

View File

@ -428,6 +428,7 @@ fn spawn_service<T: BeaconChainTypes>(
id, id,
source, source,
message, message,
..
} => { } => {
// Update prometheus metrics. // Update prometheus metrics.
expose_receive_metrics(&message); expose_receive_metrics(&message);