Use Gossipsub 1.1 (#1516)

## Issue Addressed

#1172

## Proposed Changes

* updates the libp2p dependency
* small adaptions based on changes in libp2p
* report not just valid messages but also invalid and distinguish between `IGNORE`d messages and `REJECT`ed messages


Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
blacktemplar 2020-08-30 13:06:50 +00:00
parent b6340ec495
commit c18d37c202
15 changed files with 332 additions and 142 deletions

77
Cargo.lock generated
View File

@ -309,12 +309,6 @@ dependencies = [
"safemem", "safemem",
] ]
[[package]]
name = "base64"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.12.3" version = "0.12.3"
@ -462,21 +456,6 @@ dependencies = [
"constant_time_eq", "constant_time_eq",
] ]
[[package]]
name = "blake3"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce4f9586c9a3151c4b49b19e82ba163dd073614dd057e53c969e1a4db5b52720"
dependencies = [
"arrayref",
"arrayvec",
"cc",
"cfg-if",
"constant_time_eq",
"crypto-mac 0.8.0",
"digest 0.9.0",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.7.3" version = "0.7.3"
@ -2644,8 +2623,8 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a"
[[package]] [[package]]
name = "libp2p" name = "libp2p"
version = "0.23.0" version = "0.25.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"atomic", "atomic",
"bytes 0.5.6", "bytes 0.5.6",
@ -2662,7 +2641,7 @@ dependencies = [
"libp2p-tcp", "libp2p-tcp",
"libp2p-websocket", "libp2p-websocket",
"multihash", "multihash",
"parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df)",
"parking_lot 0.10.2", "parking_lot 0.10.2",
"pin-project", "pin-project",
"smallvec 1.4.2", "smallvec 1.4.2",
@ -2706,7 +2685,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-core" name = "libp2p-core"
version = "0.21.0" version = "0.21.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"asn1_der", "asn1_der",
"bs58", "bs58",
@ -2719,8 +2698,8 @@ dependencies = [
"libsecp256k1", "libsecp256k1",
"log 0.4.11", "log 0.4.11",
"multihash", "multihash",
"multistream-select 0.8.2 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", "multistream-select 0.8.2 (git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df)",
"parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df)",
"parking_lot 0.10.2", "parking_lot 0.10.2",
"pin-project", "pin-project",
"prost", "prost",
@ -2739,7 +2718,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-core-derive" name = "libp2p-core-derive"
version = "0.20.2" version = "0.20.2"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"quote", "quote",
"syn", "syn",
@ -2748,7 +2727,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-dns" name = "libp2p-dns"
version = "0.21.0" version = "0.21.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"futures 0.3.5", "futures 0.3.5",
"libp2p-core 0.21.0", "libp2p-core 0.21.0",
@ -2757,10 +2736,10 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-gossipsub" name = "libp2p-gossipsub"
version = "0.21.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"base64 0.11.0", "base64 0.12.3",
"byteorder", "byteorder",
"bytes 0.5.6", "bytes 0.5.6",
"fnv", "fnv",
@ -2773,16 +2752,16 @@ dependencies = [
"prost", "prost",
"prost-build", "prost-build",
"rand 0.7.3", "rand 0.7.3",
"sha2 0.8.2", "sha2 0.9.1",
"smallvec 1.4.2", "smallvec 1.4.2",
"unsigned-varint 0.4.0", "unsigned-varint 0.5.0",
"wasm-timer", "wasm-timer",
] ]
[[package]] [[package]]
name = "libp2p-identify" name = "libp2p-identify"
version = "0.21.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"futures 0.3.5", "futures 0.3.5",
"libp2p-core 0.21.0", "libp2p-core 0.21.0",
@ -2797,7 +2776,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-mplex" name = "libp2p-mplex"
version = "0.21.0" version = "0.21.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"fnv", "fnv",
@ -2812,7 +2791,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-noise" name = "libp2p-noise"
version = "0.23.0" version = "0.23.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"curve25519-dalek 2.1.0", "curve25519-dalek 2.1.0",
@ -2832,9 +2811,10 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-swarm" name = "libp2p-swarm"
version = "0.21.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"either",
"futures 0.3.5", "futures 0.3.5",
"libp2p-core 0.21.0", "libp2p-core 0.21.0",
"log 0.4.11", "log 0.4.11",
@ -2847,7 +2827,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-tcp" name = "libp2p-tcp"
version = "0.21.0" version = "0.21.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"futures 0.3.5", "futures 0.3.5",
"futures-timer", "futures-timer",
@ -2862,7 +2842,7 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-websocket" name = "libp2p-websocket"
version = "0.22.0" version = "0.22.0"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"async-tls", "async-tls",
"either", "either",
@ -3221,13 +3201,12 @@ dependencies = [
[[package]] [[package]]
name = "multihash" name = "multihash"
version = "0.11.3" version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51cc1552a982658478dbc22eefb72bb1d4fd1161eb9818f7bbf4347443f07569" checksum = "567122ab6492f49b59def14ecc36e13e64dca4188196dd0cd41f9f3f979f3df6"
dependencies = [ dependencies = [
"blake2b_simd", "blake2b_simd",
"blake2s_simd", "blake2s_simd",
"blake3",
"digest 0.9.0", "digest 0.9.0",
"sha-1 0.9.1", "sha-1 0.9.1",
"sha2 0.9.1", "sha2 0.9.1",
@ -3244,7 +3223,7 @@ checksum = "1255076139a83bb467426e7f8d0134968a8118844faa755985e077cf31850333"
[[package]] [[package]]
name = "multistream-select" name = "multistream-select"
version = "0.8.2" version = "0.8.2"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"bytes 0.5.6", "bytes 0.5.6",
"futures 0.3.5", "futures 0.3.5",
@ -3538,7 +3517,7 @@ dependencies = [
[[package]] [[package]]
name = "parity-multiaddr" name = "parity-multiaddr"
version = "0.9.1" version = "0.9.1"
source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
dependencies = [ dependencies = [
"arrayref", "arrayref",
"bs58", "bs58",
@ -6033,6 +6012,10 @@ name = "unsigned-varint"
version = "0.5.0" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a98e44fc6af1e18c3a06666d829b4fd8d2714fb2dbffe8ab99d5dc7ea6baa628" checksum = "a98e44fc6af1e18c3a06666d829b4fd8d2714fb2dbffe8ab99d5dc7ea6baa628"
dependencies = [
"bytes 0.5.6",
"futures_codec",
]
[[package]] [[package]]
name = "untrusted" name = "untrusted"

View File

@ -41,7 +41,7 @@ rand = "0.7.3"
[dependencies.libp2p] [dependencies.libp2p]
#version = "0.23.0" #version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p" git = "https://github.com/sigp/rust-libp2p"
rev = "bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" rev = "d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df"
default-features = false default-features = false
features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"] features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"]

View File

@ -131,8 +131,9 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
type InboundProtocol = DelegateInProto<TSpec>; type InboundProtocol = DelegateInProto<TSpec>;
type OutboundProtocol = DelegateOutProto<TSpec>; type OutboundProtocol = DelegateOutProto<TSpec>;
type OutboundOpenInfo = DelegateOutInfo<TSpec>; type OutboundOpenInfo = DelegateOutInfo<TSpec>;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
let gossip_proto = self.gossip_handler.listen_protocol(); let gossip_proto = self.gossip_handler.listen_protocol();
let rpc_proto = self.rpc_handler.listen_protocol(); let rpc_proto = self.rpc_handler.listen_protocol();
let identify_proto = self.identify_handler.listen_protocol(); let identify_proto = self.identify_handler.listen_protocol();
@ -147,24 +148,27 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
SelectUpgrade::new(rpc_proto.into_upgrade().1, identify_proto.into_upgrade().1), SelectUpgrade::new(rpc_proto.into_upgrade().1, identify_proto.into_upgrade().1),
); );
SubstreamProtocol::new(select).with_timeout(timeout) SubstreamProtocol::new(select, ()).with_timeout(timeout)
} }
fn inject_fully_negotiated_inbound( fn inject_fully_negotiated_inbound(
&mut self, &mut self,
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output, out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) { ) {
match out { match out {
// Gossipsub // Gossipsub
EitherOutput::First(out) => self.gossip_handler.inject_fully_negotiated_inbound(out), EitherOutput::First(out) => {
self.gossip_handler.inject_fully_negotiated_inbound(out, ())
}
// RPC // RPC
EitherOutput::Second(EitherOutput::First(out)) => { EitherOutput::Second(EitherOutput::First(out)) => {
self.rpc_handler.inject_fully_negotiated_inbound(out) self.rpc_handler.inject_fully_negotiated_inbound(out, ())
} }
// Identify // Identify
EitherOutput::Second(EitherOutput::Second(out)) => { EitherOutput::Second(EitherOutput::Second(out)) => self
self.identify_handler.inject_fully_negotiated_inbound(out) .identify_handler
} .inject_fully_negotiated_inbound(out, ()),
} }
} }
@ -317,10 +321,11 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
event, event,
))); )));
} }
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::A), protocol: protocol
info: EitherOutput::First(info), .map_upgrade(EitherUpgrade::A)
.map_info(EitherOutput::First),
}); });
} }
Poll::Pending => (), Poll::Pending => (),
@ -333,10 +338,11 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::RPC(event))); return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::RPC(event)));
} }
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::A(u))), protocol: protocol
info: EitherOutput::Second(EitherOutput::First(info)), .map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::A(u)))
.map_info(|info| EitherOutput::Second(EitherOutput::First(info))),
}); });
} }
Poll::Pending => (), Poll::Pending => (),
@ -351,10 +357,11 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Identify(event))); return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Identify(event)));
} }
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () }) => { Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(u))), protocol: protocol
info: EitherOutput::Second(EitherOutput::Second(())), .map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(u)))
.map_info(|_| EitherOutput::Second(EitherOutput::Second(()))),
}); });
} }
Poll::Pending => (), Poll::Pending => (),

View File

@ -54,16 +54,18 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
type InboundProtocol = DelegateInProto<TSpec>; type InboundProtocol = DelegateInProto<TSpec>;
type OutboundProtocol = DelegateOutProto<TSpec>; type OutboundProtocol = DelegateOutProto<TSpec>;
type OutboundOpenInfo = DelegateOutInfo<TSpec>; type OutboundOpenInfo = DelegateOutInfo<TSpec>;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
self.delegate.listen_protocol() self.delegate.listen_protocol()
} }
fn inject_fully_negotiated_inbound( fn inject_fully_negotiated_inbound(
&mut self, &mut self,
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output, out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) { ) {
self.delegate.inject_fully_negotiated_inbound(out) self.delegate.inject_fully_negotiated_inbound(out, ())
} }
fn inject_fully_negotiated_outbound( fn inject_fully_negotiated_outbound(
@ -127,11 +129,8 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
Poll::Ready(ProtocolsHandlerEvent::Close(err)) => { Poll::Ready(ProtocolsHandlerEvent::Close(err)) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(err)) return Poll::Ready(ProtocolsHandlerEvent::Close(err))
} }
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol });
protocol,
info,
});
} }
Poll::Pending => (), Poll::Pending => (),
} }

View File

@ -11,7 +11,10 @@ use libp2p::{
identity::Keypair, identity::Keypair,
Multiaddr, Multiaddr,
}, },
gossipsub::{Gossipsub, GossipsubEvent, MessageAuthenticity, MessageId}, gossipsub::{
Gossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity,
MessageId,
},
identify::{Identify, IdentifyEvent}, identify::{Identify, IdentifyEvent},
swarm::{ swarm::{
NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters,
@ -94,15 +97,19 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
let meta_data = load_or_build_metadata(&net_conf.network_dir, &log); let meta_data = load_or_build_metadata(&net_conf.network_dir, &log);
// TODO: Until other clients support no author, we will use a 0 peer_id as our author. let gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, net_conf.gs_config.clone())
let message_author = PeerId::from_bytes(vec![0, 1, 0]).expect("Valid peer id"); .map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;
// Temporarily disable scoring until parameters are tested.
/*
gossipsub
.with_peer_score(PeerScoreParams::default(), PeerScoreThresholds::default())
.expect("Valid score params and thresholds");
*/
Ok(Behaviour { Ok(Behaviour {
eth2_rpc: RPC::new(log.clone()), eth2_rpc: RPC::new(log.clone()),
gossipsub: Gossipsub::new( gossipsub,
MessageAuthenticity::Author(message_author),
net_conf.gs_config.clone(),
),
identify, identify,
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log) peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
.await?, .await?,
@ -147,6 +154,10 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
GossipEncoding::default(), GossipEncoding::default(),
self.enr_fork_id.fork_digest, self.enr_fork_id.fork_digest,
); );
// TODO: Implement scoring
// let topic: Topic = gossip_topic.into();
// self.gossipsub.set_topic_params(t.hash(), TopicScoreParams::default());
self.subscribe(gossip_topic) self.subscribe(gossip_topic)
} }
@ -168,6 +179,12 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
GossipEncoding::default(), GossipEncoding::default(),
self.enr_fork_id.fork_digest, self.enr_fork_id.fork_digest,
); );
// TODO: Implement scoring
/*
let t: Topic = topic.clone().into();
self.gossipsub
.set_topic_params(t.hash(), TopicScoreParams::default());
*/
self.subscribe(topic) self.subscribe(topic)
} }
@ -189,9 +206,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.write() .write()
.insert(topic.clone()); .insert(topic.clone());
let topic_str: String = topic.clone().into(); let topic: Topic = topic.into();
debug!(self.log, "Subscribed to topic"; "topic" => topic_str);
self.gossipsub.subscribe(topic.into()) match self.gossipsub.subscribe(&topic) {
Err(_) => {
warn!(self.log, "Failed to subscribe to topic"; "topic" => topic.to_string());
false
}
Ok(v) => {
debug!(self.log, "Subscribed to topic"; "topic" => topic.to_string());
v
}
}
} }
/// Unsubscribe from a gossipsub topic. /// Unsubscribe from a gossipsub topic.
@ -201,8 +227,20 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.gossipsub_subscriptions .gossipsub_subscriptions
.write() .write()
.remove(&topic); .remove(&topic);
// unsubscribe from the topic // unsubscribe from the topic
self.gossipsub.unsubscribe(topic.into()) let topic: Topic = topic.into();
match self.gossipsub.unsubscribe(&topic) {
Err(_) => {
warn!(self.log, "Failed to unsubscribe from topic"; "topic" => topic.to_string());
false
}
Ok(v) => {
debug!(self.log, "Unsubscribed to topic"; "topic" => topic.to_string());
v
}
}
} }
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding. /// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
@ -211,7 +249,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) { for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
match message.encode(GossipEncoding::default()) { match message.encode(GossipEncoding::default()) {
Ok(message_data) => { Ok(message_data) => {
if let Err(e) = self.gossipsub.publish(&topic.into(), message_data) { if let Err(e) = self.gossipsub.publish(topic.into(), message_data) {
slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e)); slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e));
} }
} }
@ -221,11 +259,21 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
} }
} }
/// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated /// Informs the gossipsub about the result of a message validation.
/// once validated by the beacon chain. /// If the message is valid it will get propagated by gossipsub.
pub fn validate_message(&mut self, propagation_source: &PeerId, message_id: MessageId) { pub fn report_message_validation_result(
self.gossipsub &mut self,
.validate_message(&message_id, propagation_source); propagation_source: &PeerId,
message_id: MessageId,
validation_result: MessageAcceptance,
) {
if let Err(e) = self.gossipsub.report_message_validation_result(
&message_id,
propagation_source,
validation_result,
) {
warn!(self.log, "Failed to report message validation"; "message_id" => message_id.to_string(), "peer_id" => propagation_source.to_string(), "error" => format!("{:?}", e));
}
} }
/* Eth2 RPC behaviour functions */ /* Eth2 RPC behaviour functions */
@ -392,11 +440,25 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
fn on_gossip_event(&mut self, event: GossipsubEvent) { fn on_gossip_event(&mut self, event: GossipsubEvent) {
match event { match event {
GossipsubEvent::Message(propagation_source, id, gs_msg) => { GossipsubEvent::Message {
propagation_source,
message_id: id,
message: gs_msg,
} => {
// Note: We are keeping track here of the peer that sent us the message, not the // Note: We are keeping track here of the peer that sent us the message, not the
// peer that originally published the message. // peer that originally published the message.
match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) { match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) {
Err(e) => debug!(self.log, "Could not decode gossipsub message"; "error" => e), Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => e);
//reject the message
if let Err(e) = self.gossipsub.report_message_validation_result(
&id,
&propagation_source,
MessageAcceptance::Reject,
) {
warn!(self.log, "Failed to report message validation"; "message_id" => id.to_string(), "peer_id" => propagation_source.to_string(), "error" => format!("{:?}", e));
}
}
Ok(msg) => { Ok(msg) => {
// Notify the network // Notify the network
self.add_event(BehaviourEvent::PubsubMessage { self.add_event(BehaviourEvent::PubsubMessage {

View File

@ -99,8 +99,8 @@ impl Default for Config {
let gs_config = GossipsubConfigBuilder::new() let gs_config = GossipsubConfigBuilder::new()
.max_transmit_size(GOSSIP_MAX_SIZE) .max_transmit_size(GOSSIP_MAX_SIZE)
.heartbeat_interval(Duration::from_millis(700)) .heartbeat_interval(Duration::from_millis(700))
.mesh_n(6) .mesh_n(8)
.mesh_n_low(5) .mesh_n_low(6)
.mesh_n_high(12) .mesh_n_high(12)
.gossip_lazy(6) .gossip_lazy(6)
.fanout_ttl(Duration::from_secs(60)) .fanout_ttl(Duration::from_secs(60))
@ -111,7 +111,8 @@ impl Default for Config {
// prevent duplicates for 550 heartbeats(700millis * 550) = 385 secs // prevent duplicates for 550 heartbeats(700millis * 550) = 385 secs
.duplicate_cache_time(Duration::from_secs(385)) .duplicate_cache_time(Duration::from_secs(385))
.message_id_fn(gossip_message_id) .message_id_fn(gossip_message_id)
.build(); .build()
.expect("valid gossipsub configuration");
// discv5 configuration // discv5 configuration
let discv5_config = Discv5ConfigBuilder::new() let discv5_config = Discv5ConfigBuilder::new()

View File

@ -19,7 +19,7 @@ pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig; pub use config::Config as NetworkConfig;
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
pub use discv5; pub use discv5;
pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{MessageAcceptance, MessageId, Topic, TopicHash};
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{multiaddr, Multiaddr};
pub use metrics::scrape_discovery_metrics; pub use metrics::scrape_discovery_metrics;

View File

@ -81,7 +81,7 @@ where
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// The upgrade for inbound substreams. /// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>>, listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>, ()>,
/// Errors occurring on outbound and inbound connections queued for reporting back. /// Errors occurring on outbound and inbound connections queued for reporting back.
pending_errors: Vec<HandlerErr>, pending_errors: Vec<HandlerErr>,
@ -225,7 +225,10 @@ impl<TSpec> RPCHandler<TSpec>
where where
TSpec: EthSpec, TSpec: EthSpec,
{ {
pub fn new(listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>>, log: &slog::Logger) -> Self { pub fn new(
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>, ()>,
log: &slog::Logger,
) -> Self {
RPCHandler { RPCHandler {
listen_protocol, listen_protocol,
pending_errors: Vec::new(), pending_errors: Vec::new(),
@ -249,7 +252,7 @@ where
/// ///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > **Note**: If you modify the protocol, modifications will only applies to future inbound
/// > substreams, not the ones already being negotiated. /// > substreams, not the ones already being negotiated.
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<RPCProtocol<TSpec>> { pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<RPCProtocol<TSpec>, ()> {
&self.listen_protocol &self.listen_protocol
} }
@ -257,7 +260,7 @@ where
/// ///
/// > **Note**: If you modify the protocol, modifications will only apply to future inbound /// > **Note**: If you modify the protocol, modifications will only apply to future inbound
/// > substreams, not the ones already being negotiated. /// > substreams, not the ones already being negotiated.
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<RPCProtocol<TSpec>> { pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<RPCProtocol<TSpec>, ()> {
&mut self.listen_protocol &mut self.listen_protocol
} }
@ -344,14 +347,16 @@ where
type InboundProtocol = RPCProtocol<TSpec>; type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>; type OutboundProtocol = RPCRequest<TSpec>;
type OutboundOpenInfo = (RequestId, RPCRequest<TSpec>); // Keep track of the id and the request type OutboundOpenInfo = (RequestId, RPCRequest<TSpec>); // Keep track of the id and the request
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
self.listen_protocol.clone() self.listen_protocol.clone()
} }
fn inject_fully_negotiated_inbound( fn inject_fully_negotiated_inbound(
&mut self, &mut self,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output, substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) { ) {
// only accept new peer requests when active // only accept new peer requests when active
if !matches!(self.state, HandlerState::Active) { if !matches!(self.state, HandlerState::Active) {
@ -863,8 +868,7 @@ where
let (id, req) = self.dial_queue.remove(0); let (id, req) = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit(); self.dial_queue.shrink_to_fit();
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()), protocol: SubstreamProtocol::new(req.clone(), ()).map_info(|()| (id, req)),
info: (id, req),
}); });
} }
Poll::Pending Poll::Pending

View File

@ -169,9 +169,12 @@ where
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
RPCHandler::new( RPCHandler::new(
SubstreamProtocol::new(RPCProtocol { SubstreamProtocol::new(
phantom: PhantomData, RPCProtocol {
}), phantom: PhantomData,
},
(),
),
&self.log, &self.log,
) )
} }

View File

@ -1,4 +1,4 @@
use libp2p::gossipsub::Topic; use libp2p::gossipsub::IdentTopic as Topic;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use types::SubnetId; use types::SubnetId;
@ -139,7 +139,7 @@ impl GossipTopic {
impl Into<Topic> for GossipTopic { impl Into<Topic> for GossipTopic {
fn into(self) -> Topic { fn into(self) -> Topic {
Topic::new(self.into()) Topic::new(self)
} }
} }

View File

@ -10,6 +10,7 @@ use std::time::Duration;
use types::{EnrForkId, MinimalEthSpec}; use types::{EnrForkId, MinimalEthSpec};
type E = MinimalEthSpec; type E = MinimalEthSpec;
use libp2p::gossipsub::GossipsubConfigBuilder;
use tempdir::TempDir; use tempdir::TempDir;
pub struct Libp2pInstance(LibP2PService<E>, exit_future::Signal); pub struct Libp2pInstance(LibP2PService<E>, exit_future::Signal);
@ -83,8 +84,11 @@ pub fn build_config(port: u16, mut boot_nodes: Vec<Enr>) -> NetworkConfig {
config.boot_nodes_enr.append(&mut boot_nodes); config.boot_nodes_enr.append(&mut boot_nodes);
config.network_dir = path.into_path(); config.network_dir = path.into_path();
// Reduce gossipsub heartbeat parameters // Reduce gossipsub heartbeat parameters
config.gs_config.heartbeat_initial_delay = Duration::from_millis(500); config.gs_config = GossipsubConfigBuilder::from(config.gs_config)
config.gs_config.heartbeat_interval = Duration::from_millis(500); .heartbeat_initial_delay(Duration::from_millis(500))
.heartbeat_interval(Duration::from_millis(500))
.build()
.unwrap();
config config
} }

View File

@ -7,7 +7,7 @@ use beacon_chain::{
attestation_verification::Error as AttnError, observed_operations::ObservationOutcome, attestation_verification::Error as AttnError, observed_operations::ObservationOutcome,
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError,
}; };
use eth2_libp2p::{MessageId, PeerId}; use eth2_libp2p::{MessageAcceptance, MessageId, PeerId};
use slog::{crit, debug, error, info, trace, warn, Logger}; use slog::{crit, debug, error, info, trace, warn, Logger};
use ssz::Encode; use ssz::Encode;
use std::sync::Arc; use std::sync::Arc;
@ -51,6 +51,7 @@ impl<T: BeaconChainTypes> Worker<T> {
Err(e) => { Err(e) => {
self.handle_attestation_verification_failure( self.handle_attestation_verification_failure(
peer_id, peer_id,
message_id,
beacon_block_root, beacon_block_root,
"unaggregated", "unaggregated",
e, e,
@ -61,7 +62,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// Indicate to the `Network` service that this message is valid and can be // Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network. // propagated on the gossip network.
self.propagate_gossip_message(message_id, peer_id.clone()); self.propagate_validation_result(message_id, peer_id.clone(), MessageAcceptance::Accept);
if !should_import { if !should_import {
return; return;
@ -124,8 +125,10 @@ impl<T: BeaconChainTypes> Worker<T> {
{ {
Ok(aggregate) => aggregate, Ok(aggregate) => aggregate,
Err(e) => { Err(e) => {
// Report the failure to gossipsub
self.handle_attestation_verification_failure( self.handle_attestation_verification_failure(
peer_id, peer_id,
message_id,
beacon_block_root, beacon_block_root,
"aggregated", "aggregated",
e, e,
@ -136,7 +139,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// Indicate to the `Network` service that this message is valid and can be // Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network. // propagated on the gossip network.
self.propagate_gossip_message(message_id, peer_id.clone()); self.propagate_validation_result(message_id, peer_id.clone(), MessageAcceptance::Accept);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL);
@ -195,26 +198,43 @@ impl<T: BeaconChainTypes> Worker<T> {
"slot" => verified_block.block.slot(), "slot" => verified_block.block.slot(),
"hash" => verified_block.block_root.to_string() "hash" => verified_block.block_root.to_string()
); );
self.propagate_gossip_message(message_id, peer_id.clone()); self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Accept,
);
verified_block verified_block
} }
Err(BlockError::ParentUnknown(block)) => { Err(BlockError::ParentUnknown(block)) => {
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
return; return;
} }
Err(BlockError::BlockIsAlreadyKnown) => { Err(e @ BlockError::FutureSlot { .. })
debug!( | Err(e @ BlockError::WouldRevertFinalizedSlot { .. })
self.log, | Err(e @ BlockError::BlockIsAlreadyKnown)
"Gossip block is already known"; | Err(e @ BlockError::RepeatProposal { .. })
); | Err(e @ BlockError::NotFinalizedDescendant { .. })
| Err(e @ BlockError::BeaconChainError(_)) => {
warn!(self.log, "Could not verify block for gossip, ignoring the block";
"error" => e.to_string());
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
Err(e) => { Err(e @ BlockError::StateRootMismatch { .. })
warn!( | Err(e @ BlockError::IncorrectBlockProposer { .. })
self.log, | Err(e @ BlockError::BlockSlotLimitReached)
"Could not verify block for gossip"; | Err(e @ BlockError::ProposalSignatureInvalid)
"error" => format!("{:?}", e) | Err(e @ BlockError::NonLinearSlots)
); | Err(e @ BlockError::UnknownValidator(_))
| Err(e @ BlockError::PerBlockProcessingError(_))
| Err(e @ BlockError::NonLinearParentRoots)
| Err(e @ BlockError::BlockIsNotLaterThanParent { .. })
| Err(e @ BlockError::InvalidSignature)
| Err(e @ BlockError::TooManySkippedSlots { .. })
| Err(e @ BlockError::GenesisBlock) => {
warn!(self.log, "Could not verify block for gossip, rejecting the block";
"error" => e.to_string());
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
return; return;
} }
}; };
@ -290,6 +310,11 @@ impl<T: BeaconChainTypes> Worker<T> {
let exit = match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) { let exit = match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) {
Ok(ObservationOutcome::New(exit)) => exit, Ok(ObservationOutcome::New(exit)) => exit,
Ok(ObservationOutcome::AlreadyKnown) => { Ok(ObservationOutcome::AlreadyKnown) => {
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Ignore,
);
debug!( debug!(
self.log, self.log,
"Dropping exit for already exiting validator"; "Dropping exit for already exiting validator";
@ -306,13 +331,16 @@ impl<T: BeaconChainTypes> Worker<T> {
"peer" => peer_id.to_string(), "peer" => peer_id.to_string(),
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
); );
// These errors occur due to a fault in the beacon chain. It is not necessarily
// the fault on the peer.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
}; };
metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL); metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL);
self.propagate_gossip_message(message_id, peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
self.chain.import_voluntary_exit(exit); self.chain.import_voluntary_exit(exit);
debug!(self.log, "Successfully imported voluntary exit"); debug!(self.log, "Successfully imported voluntary exit");
@ -341,9 +369,12 @@ impl<T: BeaconChainTypes> Worker<T> {
"validator_index" => validator_index, "validator_index" => validator_index,
"peer" => peer_id.to_string() "peer" => peer_id.to_string()
); );
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
Err(e) => { Err(e) => {
// This is likely a fault with the beacon chain and not necessarily a
// malicious message from the peer.
debug!( debug!(
self.log, self.log,
"Dropping invalid proposer slashing"; "Dropping invalid proposer slashing";
@ -351,13 +382,14 @@ impl<T: BeaconChainTypes> Worker<T> {
"peer" => peer_id.to_string(), "peer" => peer_id.to_string(),
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
); );
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
}; };
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL); metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);
self.propagate_gossip_message(message_id, peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
self.chain.import_proposer_slashing(slashing); self.chain.import_proposer_slashing(slashing);
debug!(self.log, "Successfully imported proposer slashing"); debug!(self.log, "Successfully imported proposer slashing");
@ -383,6 +415,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"reason" => "Slashings already known for all slashed validators", "reason" => "Slashings already known for all slashed validators",
"peer" => peer_id.to_string() "peer" => peer_id.to_string()
); );
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
Err(e) => { Err(e) => {
@ -392,13 +425,14 @@ impl<T: BeaconChainTypes> Worker<T> {
"peer" => peer_id.to_string(), "peer" => peer_id.to_string(),
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
); );
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
}; };
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL); metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);
self.propagate_gossip_message(message_id, peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
if let Err(e) = self.chain.import_attester_slashing(slashing) { if let Err(e) = self.chain.import_attester_slashing(slashing) {
debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e)); debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e));
@ -441,11 +475,19 @@ impl<T: BeaconChainTypes> Worker<T> {
/// the gossip network. /// the gossip network.
/// ///
/// Creates a log if there is an interal error. /// Creates a log if there is an interal error.
fn propagate_gossip_message(&self, message_id: MessageId, peer_id: PeerId) { /// Propagates the result of the validation fot the given message to the network. If the result
/// is valid the message gets forwarded to other peers.
fn propagate_validation_result(
&self,
message_id: MessageId,
propagation_source: PeerId,
validation_result: MessageAcceptance,
) {
self.network_tx self.network_tx
.send(NetworkMessage::Validate { .send(NetworkMessage::ValidationResult {
propagation_source: peer_id, propagation_source,
message_id, message_id,
validation_result,
}) })
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
warn!( warn!(
@ -469,6 +511,7 @@ impl<T: BeaconChainTypes> Worker<T> {
pub fn handle_attestation_verification_failure( pub fn handle_attestation_verification_failure(
&self, &self,
peer_id: PeerId, peer_id: PeerId,
message_id: MessageId,
beacon_block_root: Hash256, beacon_block_root: Hash256,
attestation_type: &str, attestation_type: &str,
error: AttnError, error: AttnError,
@ -485,6 +528,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message, _only_ if we trust our own clock. * The peer has published an invalid consensus message, _only_ if we trust our own clock.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => {
/* /*
@ -492,6 +540,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::EmptyAggregationBitfield => { AttnError::EmptyAggregationBitfield => {
/* /*
@ -500,10 +553,12 @@ impl<T: BeaconChainTypes> Worker<T> {
* Whilst we don't gossip this attestation, this act is **not** a clear * Whilst we don't gossip this attestation, this act is **not** a clear
* violation of the spec nor indication of fault. * violation of the spec nor indication of fault.
* *
* This may change soon. Reference:
*
* https://github.com/ethereum/eth2.0-specs/pull/1732
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::AggregatorPubkeyUnknown(_) => { AttnError::AggregatorPubkeyUnknown(_) => {
/* /*
@ -519,6 +574,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::AggregatorNotInCommittee { .. } => { AttnError::AggregatorNotInCommittee { .. } => {
/* /*
@ -534,6 +594,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::AttestationAlreadyKnown { .. } => { AttnError::AttestationAlreadyKnown { .. } => {
/* /*
@ -549,6 +614,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"block" => format!("{}", beacon_block_root), "block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type), "type" => format!("{:?}", attestation_type),
); );
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
AttnError::AggregatorAlreadyKnown(_) => { AttnError::AggregatorAlreadyKnown(_) => {
@ -565,6 +631,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"block" => format!("{}", beacon_block_root), "block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type), "type" => format!("{:?}", attestation_type),
); );
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
AttnError::PriorAttestationKnown { .. } => { AttnError::PriorAttestationKnown { .. } => {
@ -580,6 +647,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"block" => format!("{}", beacon_block_root), "block" => format!("{}", beacon_block_root),
"type" => format!("{:?}", attestation_type), "type" => format!("{:?}", attestation_type),
); );
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
AttnError::ValidatorIndexTooHigh(_) => { AttnError::ValidatorIndexTooHigh(_) => {
@ -589,6 +657,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::UnknownHeadBlock { beacon_block_root } => { AttnError::UnknownHeadBlock { beacon_block_root } => {
// Note: its a little bit unclear as to whether or not this block is unknown or // Note: its a little bit unclear as to whether or not this block is unknown or
@ -605,7 +678,10 @@ impl<T: BeaconChainTypes> Worker<T> {
); );
// we don't know the block, get the sync manager to handle the block lookup // we don't know the block, get the sync manager to handle the block lookup
self.sync_tx self.sync_tx
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) .send(SyncMessage::UnknownBlockHash(
peer_id.clone(),
*beacon_block_root,
))
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
warn!( warn!(
self.log, self.log,
@ -613,6 +689,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"msg" => "UnknownBlockHash" "msg" => "UnknownBlockHash"
) )
}); });
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return;
} }
AttnError::UnknownTargetRoot(_) => { AttnError::UnknownTargetRoot(_) => {
@ -632,6 +709,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::BadTargetEpoch => { AttnError::BadTargetEpoch => {
/* /*
@ -640,6 +722,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::NoCommitteeForSlotAndIndex { .. } => { AttnError::NoCommitteeForSlotAndIndex { .. } => {
/* /*
@ -647,6 +734,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::NotExactlyOneAggregationBitSet(_) => { AttnError::NotExactlyOneAggregationBitSet(_) => {
/* /*
@ -654,6 +746,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::AttestsToFutureBlock { .. } => { AttnError::AttestsToFutureBlock { .. } => {
/* /*
@ -661,6 +758,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::InvalidSubnetId { received, expected } => { AttnError::InvalidSubnetId { received, expected } => {
@ -672,7 +774,12 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received attestation on incorrect subnet"; "Received attestation on incorrect subnet";
"expected" => format!("{:?}", expected), "expected" => format!("{:?}", expected),
"received" => format!("{:?}", received), "received" => format!("{:?}", received),
) );
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::Invalid(_) => { AttnError::Invalid(_) => {
/* /*
@ -680,6 +787,11 @@ impl<T: BeaconChainTypes> Worker<T> {
* *
* The peer has published an invalid consensus message. * The peer has published an invalid consensus message.
*/ */
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::TooManySkippedSlots { AttnError::TooManySkippedSlots {
head_block_slot, head_block_slot,
@ -695,7 +807,14 @@ impl<T: BeaconChainTypes> Worker<T> {
"Rejected long skip slot attestation"; "Rejected long skip slot attestation";
"head_block_slot" => head_block_slot, "head_block_slot" => head_block_slot,
"attestation_slot" => attestation_slot, "attestation_slot" => attestation_slot,
) );
// In this case we wish to penalize gossipsub peers that do this to avoid future
// attestations that have too many skip slots.
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Reject,
);
} }
AttnError::BeaconChainError(e) => { AttnError::BeaconChainError(e) => {
/* /*
@ -711,6 +830,11 @@ impl<T: BeaconChainTypes> Worker<T> {
"peer_id" => peer_id.to_string(), "peer_id" => peer_id.to_string(),
"error" => format!("{:?}", e), "error" => format!("{:?}", e),
); );
self.propagate_validation_result(
message_id,
peer_id.clone(),
MessageAcceptance::Ignore,
);
} }
} }

View File

@ -18,8 +18,6 @@ use types::{
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
}; };
//TODO: Rate limit requests
/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it.
/// Otherwise we queue it. /// Otherwise we queue it.
pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;

View File

@ -6,12 +6,12 @@ use crate::{
}; };
use crate::{error, metrics}; use crate::{error, metrics};
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{ use eth2_libp2p::{
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}, rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response, Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response,
}; };
use eth2_libp2p::{BehaviourEvent, MessageId, NetworkGlobals, PeerId}; use eth2_libp2p::{BehaviourEvent, MessageId, NetworkGlobals, PeerId};
use eth2_libp2p::{MessageAcceptance, Service as LibP2PService};
use futures::prelude::*; use futures::prelude::*;
use rest_types::ValidatorSubscription; use rest_types::ValidatorSubscription;
use slog::{debug, error, info, o, trace, warn}; use slog::{debug, error, info, o, trace, warn};
@ -55,11 +55,13 @@ pub enum NetworkMessage<T: EthSpec> {
/// Publish a list of messages to the gossipsub protocol. /// Publish a list of messages to the gossipsub protocol.
Publish { messages: Vec<PubsubMessage<T>> }, Publish { messages: Vec<PubsubMessage<T>> },
/// Validates a received gossipsub message. This will propagate the message on the network. /// Validates a received gossipsub message. This will propagate the message on the network.
Validate { ValidationResult {
/// The peer that sent us the message. We don't send back to this peer. /// The peer that sent us the message. We don't send back to this peer.
propagation_source: PeerId, propagation_source: PeerId,
/// The id of the message we are validating and propagating. /// The id of the message we are validating and propagating.
message_id: MessageId, message_id: MessageId,
/// The result of the validation
validation_result: MessageAcceptance,
}, },
/// Reports a peer to the peer manager for performing an action. /// Reports a peer to the peer manager for performing an action.
ReportPeer { peer_id: PeerId, action: PeerAction }, ReportPeer { peer_id: PeerId, action: PeerAction },
@ -216,9 +218,10 @@ fn spawn_service<T: BeaconChainTypes>(
NetworkMessage::SendError{ peer_id, error, id, reason } => { NetworkMessage::SendError{ peer_id, error, id, reason } => {
service.libp2p.respond_with_error(peer_id, id, error, reason); service.libp2p.respond_with_error(peer_id, id, error, reason);
} }
NetworkMessage::Validate { NetworkMessage::ValidationResult {
propagation_source, propagation_source,
message_id, message_id,
validation_result,
} => { } => {
trace!(service.log, "Propagating gossipsub message"; trace!(service.log, "Propagating gossipsub message";
"propagation_peer" => format!("{:?}", propagation_source), "propagation_peer" => format!("{:?}", propagation_source),
@ -227,7 +230,9 @@ fn spawn_service<T: BeaconChainTypes>(
service service
.libp2p .libp2p
.swarm .swarm
.validate_message(&propagation_source, message_id); .report_message_validation_result(
&propagation_source, message_id, validation_result
);
} }
NetworkMessage::Publish { messages } => { NetworkMessage::Publish { messages } => {
let mut topic_kinds = Vec::new(); let mut topic_kinds = Vec::new();

View File

@ -172,7 +172,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
} else { } else {
// there is no finalized chain that matches this peer's last finalized target // there is no finalized chain that matches this peer's last finalized target
// create a new finalized chain // create a new finalized chain
debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_epoch" => local_finalized_slot, "end_slot" => remote_finalized_slot, "finalized_root" => format!("{}", remote_info.finalized_root)); debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot, "end_slot" => remote_finalized_slot, "finalized_root" => format!("{}", remote_info.finalized_root));
self.chains.new_finalized_chain( self.chains.new_finalized_chain(
local_info.finalized_epoch, local_info.finalized_epoch,