From 6ca4f4709bc827e1344a3a0f5e24ac8a3cd52c65 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 25 Mar 2020 22:18:06 +1100 Subject: [PATCH] Connects the attestation service to network components (#961) * Sends attestations to the attestation service for processing * Adds 'attnets' field to local ENR * Adds ENR bitfield modification logic * Link attestation service to discovery - Updates discv5 - Links discover events to discovery - Support for ENRBitfield * Adds discovery config params, correct warnings * Rust fmt fixes * Correct tests --- Cargo.lock | 55 ++++---- beacon_node/client/src/builder.rs | 2 +- beacon_node/eth2-libp2p/Cargo.toml | 3 +- beacon_node/eth2-libp2p/src/behaviour.rs | 40 +++++- beacon_node/eth2-libp2p/src/config.rs | 8 +- .../eth2-libp2p/src/discovery/enr_helpers.rs | 31 ++++- beacon_node/eth2-libp2p/src/discovery/mod.rs | 121 +++++++++++++++++- beacon_node/eth2-libp2p/src/service.rs | 11 +- beacon_node/eth2-libp2p/src/types/topics.rs | 6 + beacon_node/eth2-libp2p/tests/common/mod.rs | 4 +- beacon_node/eth2-libp2p/tests/noise.rs | 12 +- .../network/src/attestation_service/mod.rs | 36 +++++- .../src/attestation_service/process.rs | 55 ++++++++ beacon_node/network/src/router/mod.rs | 28 ++-- beacon_node/network/src/router/processor.rs | 59 +-------- beacon_node/network/src/service.rs | 59 ++++++--- 16 files changed, 381 insertions(+), 149 deletions(-) create mode 100644 beacon_node/network/src/attestation_service/process.rs diff --git a/Cargo.lock b/Cargo.lock index 64c0e6836..c679d5d78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1090,7 +1090,7 @@ dependencies = [ [[package]] name = "enr" version = "0.1.0-alpha.3" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "base64 0.12.0", "bs58 0.3.0", @@ -1210,6 +1210,7 @@ dependencies = [ "error-chain", "eth2_ssz", "eth2_ssz_derive", + "eth2_ssz_types", "fnv", "futures", "hex 0.3.2", @@ -2058,7 +2059,7 @@ dependencies = [ [[package]] name = "libp2p" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "futures", @@ -2097,7 +2098,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "asn1_der", "bs58 0.3.0", @@ -2132,7 +2133,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "quote 1.0.3", "syn 1.0.16", @@ -2141,7 +2142,7 @@ dependencies = [ [[package]] name = "libp2p-deflate" version = "0.5.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "flate2", "futures", @@ -2152,7 +2153,7 @@ dependencies = [ [[package]] name = "libp2p-discv5" version = "0.1.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "arrayvec 0.4.12", "bigint", @@ -2183,7 +2184,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "futures", "libp2p-core", @@ -2194,7 +2195,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bs58 0.3.0", "bytes", @@ -2212,7 +2213,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.1.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "base64 0.10.1", "bs58 0.2.5", @@ -2237,7 +2238,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "futures", @@ -2256,7 +2257,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "arrayvec 0.5.1", "bytes", @@ -2283,7 +2284,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "data-encoding", "dns-parser", @@ -2305,7 +2306,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "fnv", @@ -2321,7 +2322,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.11.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "curve25519-dalek 1.2.3", @@ -2341,7 +2342,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "futures", @@ -2358,7 +2359,7 @@ dependencies = [ [[package]] name = "libp2p-plaintext" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "futures", @@ -2373,7 +2374,7 @@ dependencies = [ [[package]] name = "libp2p-secio" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "aes-ctr", "bytes", @@ -2402,7 +2403,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.3.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "futures", "libp2p-core", @@ -2415,7 +2416,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "futures", @@ -2431,7 +2432,7 @@ dependencies = [ [[package]] name = "libp2p-uds" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "futures", "libp2p-core", @@ -2442,7 +2443,7 @@ dependencies = [ [[package]] name = "libp2p-wasm-ext" version = "0.6.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "futures", "js-sys", @@ -2456,7 +2457,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "futures", @@ -2474,7 +2475,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "futures", "libp2p-core", @@ -2774,7 +2775,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.6.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "futures", @@ -3007,7 +3008,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.6.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "arrayref", "bs58 0.3.0", @@ -3024,7 +3025,7 @@ dependencies = [ [[package]] name = "parity-multihash" version = "0.2.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "blake2", "bytes", @@ -3800,7 +3801,7 @@ dependencies = [ [[package]] name = "rw-stream-sink" version = "0.1.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=8c272a9a4d115d9a1d33791479527cdcba781829#8c272a9a4d115d9a1d33791479527cdcba781829" +source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" dependencies = [ "bytes", "futures", diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index ab7897ce8..74cec8ade 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -252,7 +252,7 @@ where } /// Immediately starts the networking stack. - pub fn network(mut self, config: &mut NetworkConfig) -> Result { + pub fn network(mut self, config: &NetworkConfig) -> Result { let beacon_chain = self .beacon_chain .clone() diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index dd979a01c..f2428eeec 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -8,8 +8,9 @@ edition = "2018" hex = "0.3" # rust-libp2p is presently being sourced from a Sigma Prime fork of the # `libp2p/rust-libp2p` repository. -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "8c272a9a4d115d9a1d33791479527cdcba781829" } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "44d7a9c9cd7be74109817bcabe74b991d5bd0fee" } types = { path = "../../eth2/types" } +eth2_ssz_types = { path = "../../eth2/utils/ssz_types" } serde = "1.0.102" serde_derive = "1.0.102" eth2_ssz = "0.1.2" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 4d637bcf1..a5a5b575b 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,5 +1,6 @@ use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, RPC}; +use crate::types::GossipEncoding; use crate::Enr; use crate::{error, GossipTopic, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; @@ -13,9 +14,9 @@ use libp2p::{ NetworkBehaviour, PeerId, }; use lru::LruCache; -use slog::{debug, o, warn}; +use slog::{crit, debug, o, warn}; use std::sync::Arc; -use types::{EnrForkId, EthSpec}; +use types::{EnrForkId, EthSpec, SubnetId}; const MAX_IDENTIFY_ADDRESSES: usize = 20; @@ -55,6 +56,7 @@ impl Behaviour>, + enr_fork_id: EnrForkId, log: &slog::Logger, ) -> error::Result { let local_peer_id = local_key.public().into_peer_id(); @@ -69,7 +71,13 @@ impl Behaviour Behaviour bool { let pos = self @@ -124,6 +138,12 @@ impl Behaviour>) { for message in messages { @@ -170,6 +190,20 @@ impl Behaviour e); + } + } + + /// A request to search for peers connected to a long-lived subnet. + pub fn peers_request(&mut self, subnet_id: SubnetId) { + self.discovery.peers_request(subnet_id); + } + /// Updates the local ENR's "eth2" field with the latest EnrForkId. pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) { self.discovery.update_eth2_enr(enr_fork_id); diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 9de08f36e..b10bd362e 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -7,7 +7,6 @@ use serde_derive::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::path::PathBuf; use std::time::Duration; -use types::EnrForkId; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -64,9 +63,6 @@ pub struct Config { /// List of extra topics to initially subscribe to as strings. pub topics: Vec, - /// The initial ENR fork id. - pub enr_fork_id: EnrForkId, - /// Introduces randomization in network propagation of messages. This should only be set for /// testing purposes and will likely be removed in future versions. // TODO: Remove this functionality for mainnet @@ -112,10 +108,11 @@ impl Default for Config { // discv5 configuration let discv5_config = Discv5ConfigBuilder::new() .request_timeout(Duration::from_secs(4)) - .request_retries(1) + .request_retries(2) .enr_update(true) // update IP based on PONG responses .enr_peer_update_min(2) // prevents NAT's should be raised for mainnet .query_parallelism(5) + .query_timeout(Duration::from_secs(2)) .ip_limit(false) // limits /24 IP's in buckets. Enable for mainnet .ping_interval(Duration::from_secs(300)) .build(); @@ -136,7 +133,6 @@ impl Default for Config { libp2p_nodes: vec![], client_version: version::version(), topics, - enr_fork_id: EnrForkId::default(), propagation_percentage: None, } } diff --git a/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs b/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs index 203f9ac84..de3960b9b 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs +++ b/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs @@ -5,20 +5,28 @@ use libp2p::core::identity::Keypair; use libp2p::discv5::enr::{CombinedKey, EnrBuilder}; use slog::{debug, warn}; use ssz::Encode; +use ssz_types::BitVector; use std::convert::TryInto; use std::fs::File; use std::io::prelude::*; use std::path::Path; use std::str::FromStr; +use types::{EnrForkId, EthSpec}; + +/// The ENR field specifying the fork id. +pub const ETH2_ENR_KEY: &'static str = "eth2"; +/// The ENR field specifying the subnet bitfield. +pub const BITFIELD_ENR_KEY: &'static str = "attnets"; /// Loads an ENR from file if it exists and matches the current NodeId and sequence number. If none /// exists, generates a new one. /// /// If an ENR exists, with the same NodeId, this function checks to see if the loaded ENR from /// disk is suitable to use, otherwise we increment our newly generated ENR's sequence number. -pub fn build_or_load_enr( +pub fn build_or_load_enr( local_key: Keypair, config: &NetworkConfig, + enr_fork_id: EnrForkId, log: &slog::Logger, ) -> Result { // Build the local ENR. @@ -28,7 +36,7 @@ pub fn build_or_load_enr( .try_into() .map_err(|_| "Invalid key type for ENR records")?; - let mut local_enr = build_enr(&enr_key, config)?; + let mut local_enr = build_enr::(&enr_key, config, enr_fork_id)?; let enr_f = config.network_dir.join(ENR_FILENAME); if let Ok(mut enr_file) = File::open(enr_f.clone()) { @@ -68,7 +76,11 @@ pub fn build_or_load_enr( } /// Builds a lighthouse ENR given a `NetworkConfig`. -fn build_enr(enr_key: &CombinedKey, config: &NetworkConfig) -> Result { +fn build_enr( + enr_key: &CombinedKey, + config: &NetworkConfig, + enr_fork_id: EnrForkId, +) -> Result { let mut builder = EnrBuilder::new("v4"); if let Some(enr_address) = config.enr_address { builder.ip(enr_address); @@ -82,7 +94,12 @@ fn build_enr(enr_key: &CombinedKey, config: &NetworkConfig) -> Result::new(); + + builder.add_value(BITFIELD_ENR_KEY.into(), bitfield.as_ssz_bytes()); builder .tcp(config.libp2p_port) @@ -97,9 +114,13 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool { (local_enr.ip().is_none() || local_enr.ip() == disk_enr.ip()) // tcp ports must match && local_enr.tcp() == disk_enr.tcp() - && local_enr.get("eth2") == disk_enr.get("eth2") + // must match on the same fork + && local_enr.get(ETH2_ENR_KEY) == disk_enr.get(ETH2_ENR_KEY) // take preference over disk udp port if one is not specified && (local_enr.udp().is_none() || local_enr.udp() == disk_enr.udp()) + // we need the BITFIELD_ENR_KEY key to match, otherwise we use a new ENR. This will likely only + // be true for non-validating nodes + && local_enr.get(BITFIELD_ENR_KEY) == disk_enr.get(BITFIELD_ENR_KEY) } /// Saves an ENR to disk diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index 6b5593ffc..1ccc95767 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -2,8 +2,10 @@ mod enr_helpers; use crate::metrics; +use crate::types::EnrBitfield; use crate::Enr; use crate::{error, NetworkConfig, NetworkGlobals, PeerInfo}; +use enr_helpers::{BITFIELD_ENR_KEY, ETH2_ENR_KEY}; use futures::prelude::*; use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId}; use libp2p::discv5::enr::NodeId; @@ -11,7 +13,8 @@ use libp2p::discv5::{Discv5, Discv5Event}; use libp2p::multiaddr::Protocol; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use slog::{debug, info, warn}; -use ssz::Encode; +use ssz::{Decode, Encode}; +use ssz_types::BitVector; use std::collections::HashSet; use std::net::SocketAddr; use std::path::Path; @@ -19,7 +22,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::timer::Delay; -use types::{EnrForkId, EthSpec}; +use types::{EnrForkId, EthSpec, SubnetId}; /// Maximum seconds before searching for extra peers. const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120; @@ -27,6 +30,8 @@ const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120; const INITIAL_SEARCH_DELAY: u64 = 5; /// Local ENR storage filename. const ENR_FILENAME: &str = "enr.dat"; +/// Number of peers we'd like to have connected to a given long-lived subnet. +const TARGET_SUBNET_PEERS: u64 = 3; /// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 /// libp2p protocol. @@ -66,13 +71,15 @@ impl Discovery { pub fn new( local_key: &Keypair, config: &NetworkConfig, + enr_fork_id: EnrForkId, network_globals: Arc>, log: &slog::Logger, ) -> error::Result { let log = log.clone(); // checks if current ENR matches that found on disk - let local_enr = enr_helpers::build_or_load_enr(local_key.clone(), config, &log)?; + let local_enr = + enr_helpers::build_or_load_enr::(local_key.clone(), config, enr_fork_id, &log)?; *network_globals.local_enr.write() = Some(local_enr.clone()); @@ -162,6 +169,51 @@ impl Discovery { self.discovery.enr_entries() } + /// Adds/Removes a subnet from the ENR Bitfield + pub fn update_enr_bitfield(&mut self, subnet_id: SubnetId, value: bool) -> Result<(), String> { + let id = *subnet_id as usize; + + let local_enr = self.discovery.local_enr(); + let bitfield_bytes = local_enr + .get(BITFIELD_ENR_KEY) + .ok_or_else(|| "ENR bitfield non-existent")?; + + let mut current_bitfield = + BitVector::::from_ssz_bytes(bitfield_bytes) + .map_err(|_| "Could not decode local ENR SSZ bitfield")?; + + if id >= current_bitfield.len() { + return Err(format!( + "Subnet id: {} is outside the ENR bitfield length: {}", + id, + current_bitfield.len() + )); + } + + if current_bitfield + .get(id) + .map_err(|_| String::from("Subnet ID out of bounds"))? + == value + { + return Err(format!( + "Subnet id: {} already in the local ENR already has value: {}", + id, value + )); + } + + // set the subnet bitfield in the ENR + current_bitfield + .set(id, value) + .map_err(|_| String::from("Subnet ID out of bounds, could not set subnet ID"))?; + + // insert the bitfield into the ENR record + let _ = self + .discovery + .enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes()); + + Ok(()) + } + /// Updates the `eth2` field of our local ENR. pub fn update_eth2_enr(&mut self, enr_fork_id: EnrForkId) { // to avoid having a reference to the spec constant, for the logging we assume @@ -180,7 +232,7 @@ impl Discovery { let _ = self .discovery - .enr_insert("eth2".into(), enr_fork_id.as_ssz_bytes()) + .enr_insert(ETH2_ENR_KEY.into(), enr_fork_id.as_ssz_bytes()) .map_err(|e| { warn!( self.log, @@ -190,6 +242,39 @@ impl Discovery { }); } + /// A request to find peers on a given subnet. + // TODO: This logic should be improved with added sophistication in peer management + // This currently checks for currently connected peers and if we don't have + // PEERS_WANTED_BEFORE_DISCOVERY connected to a given subnet we search for more. + pub fn peers_request(&mut self, subnet_id: SubnetId) { + // TODO: Add PeerManager struct to do this loop for us + + let peers_on_subnet = self + .network_globals + .connected_peer_set + .read() + .values() + .fold(0, |found_peers, peer_info| { + if peer_info.on_subnet(subnet_id) { + found_peers + 1 + } else { + found_peers + } + }); + + if peers_on_subnet < TARGET_SUBNET_PEERS { + debug!(self.log, "Searching for peers for subnet"; + "subnet_id" => *subnet_id, + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS + ); + // TODO: Update to predicate search + self.find_peers(); + } + } + + /* Internal Functions */ + /// Search for new peers using the underlying discovery mechanism. fn find_peers(&mut self) { // pick a random NodeId @@ -217,11 +302,35 @@ where } fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { - // TODO: Search for a known ENR once discv5 is updated. + // Find ENR info about a peer if possible. + let mut peer_info = PeerInfo::new(); + if let Some(enr) = self.discovery.enr_of_peer(&peer_id) { + let bitfield = match enr.get(BITFIELD_ENR_KEY) { + Some(bitfield_bytes) => { + match EnrBitfield::::from_ssz_bytes(bitfield_bytes) { + Ok(bitfield) => bitfield, + Err(e) => { + warn!(self.log, "Peer had invalid ENR bitfield"; + "peer_id" => format!("{}", peer_id), + "error" => format!("{:?}", e)); + return; + } + } + } + None => { + warn!(self.log, "Peer has no ENR bitfield"; + "peer_id" => format!("{}", peer_id)); + return; + } + }; + + peer_info.enr_bitfield = Some(bitfield); + } + self.network_globals .connected_peer_set .write() - .insert(peer_id, PeerInfo::new()); + .insert(peer_id, peer_info); // TODO: Drop peers if over max_peer limit metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 6a2fc48c6..b056eac05 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -24,7 +24,7 @@ use std::io::{Error, ErrorKind}; use std::sync::Arc; use std::time::Duration; use tokio::timer::DelayQueue; -use types::EthSpec; +use types::{EnrForkId, EthSpec}; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = Behaviour, TSpec>; @@ -56,6 +56,7 @@ pub struct Service { impl Service { pub fn new( config: &NetworkConfig, + enr_fork_id: EnrForkId, log: slog::Logger, ) -> error::Result<(Arc>, Self)> { trace!(log, "Libp2p Service starting"); @@ -81,7 +82,13 @@ impl Service { // Set up the transport - tcp/ws with noise/secio and mplex/yamux let transport = build_transport(local_keypair.clone()); // Lighthouse network behaviour - let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; + let behaviour = Behaviour::new( + &local_keypair, + config, + network_globals.clone(), + enr_fork_id, + &log, + )?; Swarm::new(transport, behaviour, local_peer_id.clone()) }; diff --git a/beacon_node/eth2-libp2p/src/types/topics.rs b/beacon_node/eth2-libp2p/src/types/topics.rs index 98844a811..3745746a8 100644 --- a/beacon_node/eth2-libp2p/src/types/topics.rs +++ b/beacon_node/eth2-libp2p/src/types/topics.rs @@ -119,6 +119,12 @@ impl Into for GossipTopic { } } +impl From for GossipKind { + fn from(subnet_id: SubnetId) -> Self { + GossipKind::CommitteeIndex(subnet_id) + } +} + // helper functions // Determines if a string is a committee topic. diff --git a/beacon_node/eth2-libp2p/tests/common/mod.rs b/beacon_node/eth2-libp2p/tests/common/mod.rs index c6e8054c8..15918a21a 100644 --- a/beacon_node/eth2-libp2p/tests/common/mod.rs +++ b/beacon_node/eth2-libp2p/tests/common/mod.rs @@ -5,7 +5,7 @@ use eth2_libp2p::NetworkConfig; use eth2_libp2p::Service as LibP2PService; use slog::{debug, error, o, Drain}; use std::time::Duration; -use types::MinimalEthSpec; +use types::{EnrForkId, MinimalEthSpec}; type E = MinimalEthSpec; use tempdir::TempDir; @@ -52,7 +52,7 @@ pub fn build_libp2p_instance( ) -> LibP2PService { let config = build_config(port, boot_nodes, secret_key); // launch libp2p service - LibP2PService::new(&config, log.clone()) + LibP2PService::new(&config, EnrForkId::default(), log.clone()) .expect("should build libp2p instance") .1 } diff --git a/beacon_node/eth2-libp2p/tests/noise.rs b/beacon_node/eth2-libp2p/tests/noise.rs index 52e05438e..38713903b 100644 --- a/beacon_node/eth2-libp2p/tests/noise.rs +++ b/beacon_node/eth2-libp2p/tests/noise.rs @@ -1,7 +1,7 @@ #![cfg(test)] use crate::behaviour::{Behaviour, BehaviourEvent}; use crate::multiaddr::Protocol; -use ::types::MinimalEthSpec; +use ::types::{EnrForkId, MinimalEthSpec}; use eth2_libp2p::*; use futures::prelude::*; use libp2p::core::identity::Keypair; @@ -42,7 +42,13 @@ fn build_secio_swarm( // Set up the transport - tcp/ws with secio and mplex/yamux let transport = build_secio_transport(local_keypair.clone()); // Lighthouse network behaviour - let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; + let behaviour = Behaviour::new( + &local_keypair, + config, + network_globals.clone(), + EnrForkId::default(), + &log, + )?; Swarm::new(transport, behaviour, local_peer_id.clone()) }; @@ -122,7 +128,7 @@ fn test_secio_noise_fallback() { let log = common::build_log(log_level, enable_logging); let noisy_config = common::build_config(56010, vec![], None); - let mut noisy_node = Service::new(&noisy_config, log.clone()) + let mut noisy_node = Service::new(&noisy_config, EnrForkId::default(), log.clone()) .expect("should build a libp2p instance") .1; diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 3b16af008..23be85114 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -3,19 +3,17 @@ //! determines whether attestations should be aggregated and/or passed to the beacon node. use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::{types::GossipKind, NetworkGlobals}; +use eth2_libp2p::{types::GossipKind, MessageId, NetworkGlobals, PeerId}; use futures::prelude::*; use hashmap_delay::HashSetDelay; use rand::seq::SliceRandom; use rest_types::ValidatorSubscription; use slog::{crit, debug, error, o, warn}; use slot_clock::SlotClock; -use std::boxed::Box; use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; -use types::{Attestation, SubnetId}; -use types::{EthSpec, Slot}; +use types::{Attestation, EthSpec, SignedAggregateAndProof, Slot, SubnetId}; /// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the /// slot is less than this number, skip the peer discovery process. @@ -44,6 +42,8 @@ pub enum AttServiceMessage { EnrRemove(SubnetId), /// Discover peers for a particular subnet. DiscoverPeers(SubnetId), + /// Propagate an attestation if it's deemed valid. + Propagate(PeerId, MessageId), } pub struct AttestationService { @@ -152,11 +152,29 @@ impl AttestationService { Ok(()) } - pub fn handle_attestation( + /// Handles un-aggregated attestations from the network. + pub fn handle_unaggregated_attestation( &mut self, + message_id: MessageId, + peer_id: PeerId, subnet: SubnetId, - attestation: Box>, + attestation: Attestation, ) { + // TODO: Handle attestation processing + self.events + .push_back(AttServiceMessage::Propagate(peer_id, message_id)); + } + + /// Handles aggregate attestations from the network. + pub fn handle_aggregate_attestation( + &mut self, + message_id: MessageId, + peer_id: PeerId, + attestation: SignedAggregateAndProof, + ) { + // TODO: Handle attestation processing + self.events + .push_back(AttServiceMessage::Propagate(peer_id, message_id)); } /* Internal private functions */ @@ -231,6 +249,12 @@ impl AttestationService { self.discover_peers .insert_at((subnet_id, subscription_slot), duration_to_discover); } + } else { + // TODO: Send the time frame needed to have a peer connected, so that we can + // maintain peers for a least this duration. + // We may want to check the global PeerInfo to see estimated timeouts for each + // peer before they can be removed. + return Err("Not enough time for a discovery search"); } Ok(()) } diff --git a/beacon_node/network/src/attestation_service/process.rs b/beacon_node/network/src/attestation_service/process.rs new file mode 100644 index 000000000..4995b3a0b --- /dev/null +++ b/beacon_node/network/src/attestation_service/process.rs @@ -0,0 +1,55 @@ + + /// Process a gossip message declaring a new attestation. + /// + /// Not currently implemented. + pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, _msg: Attestation) { + // TODO: Handle subnet gossip + /* + match self.chain.process_attestation(msg.clone()) { + Ok(outcome) => match outcome { + AttestationProcessingOutcome::Processed => { + debug!( + self.log, + "Processed attestation"; + "source" => "gossip", + "peer" => format!("{:?}",peer_id), + "block_root" => format!("{}", msg.data.beacon_block_root), + "slot" => format!("{}", msg.data.slot), + ); + } + AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => { + // TODO: Maintain this attestation and re-process once sync completes + trace!( + self.log, + "Attestation for unknown block"; + "peer_id" => format!("{:?}", peer_id), + "block" => format!("{}", beacon_block_root) + ); + // we don't know the block, get the sync manager to handle the block lookup + self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root)); + } + AttestationProcessingOutcome::FutureEpoch { .. } + | AttestationProcessingOutcome::PastEpoch { .. } + | AttestationProcessingOutcome::UnknownTargetRoot { .. } + | AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation + AttestationProcessingOutcome::Invalid { .. } + | AttestationProcessingOutcome::EmptyAggregationBitfield { .. } + | AttestationProcessingOutcome::AttestsToFutureBlock { .. } + | AttestationProcessingOutcome::InvalidSignature + | AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { .. } + | AttestationProcessingOutcome::BadTargetEpoch { .. } => { + // the peer has sent a bad attestation. Remove them. + self.network.disconnect(peer_id, GoodbyeReason::Fault); + } + }, + Err(_) => { + // error is logged during the processing therefore no error is logged here + trace!( + self.log, + "Erroneous gossip attestation ssz"; + "ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())), + ); + } + }; + */ + } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index fa35d95e3..f3c5f74ca 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -16,7 +16,7 @@ use eth2_libp2p::{ use futures::future::Future; use futures::stream::Stream; use processor::Processor; -use slog::{debug, o, trace, warn}; +use slog::{crit, debug, o, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -224,19 +224,6 @@ impl Router { } self.processor.on_block_gossip(peer_id, block); } - PubsubData::AggregateAndProofAttestation(_agg_attestation) => { - // TODO: Handle propagation conditions - self.propagate_message(id, peer_id); - // TODO Handle aggregate attestion - // self.processor - // .on_attestation_gossip(peer_id.clone(), &agg_attestation); - } - PubsubData::Attestation(boxed_shard_attestation) => { - // TODO: Handle propagation conditions - self.propagate_message(id, peer_id.clone()); - self.processor - .on_attestation_gossip(peer_id, boxed_shard_attestation.1); - } PubsubData::VoluntaryExit(_exit) => { // TODO: Apply more sophisticated validation self.propagate_message(id, peer_id.clone()); @@ -255,6 +242,19 @@ impl Router { // TODO: Handle attester slashings debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) ); } + // Attestations should never reach the router. + PubsubData::AggregateAndProofAttestation(_agg_attestation) => { + crit!( + self.log, + "Attestations should always be handled by the attestation service" + ); + } + PubsubData::Attestation(_boxed_subnet_attestation) => { + crit!( + self.log, + "Attestations should always be handled by the attestation service" + ); + } } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 682c67c46..a6c5edccd 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -1,8 +1,6 @@ use crate::service::NetworkMessage; use crate::sync::SyncMessage; -use beacon_chain::{ - AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome, -}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; @@ -553,61 +551,6 @@ impl Processor { // TODO: Update with correct block gossip checking true } - - /// Process a gossip message declaring a new attestation. - /// - /// Not currently implemented. - pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, _msg: Attestation) { - // TODO: Handle subnet gossip - /* - match self.chain.process_attestation(msg.clone()) { - Ok(outcome) => match outcome { - AttestationProcessingOutcome::Processed => { - debug!( - self.log, - "Processed attestation"; - "source" => "gossip", - "peer" => format!("{:?}",peer_id), - "block_root" => format!("{}", msg.data.beacon_block_root), - "slot" => format!("{}", msg.data.slot), - ); - } - AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => { - // TODO: Maintain this attestation and re-process once sync completes - trace!( - self.log, - "Attestation for unknown block"; - "peer_id" => format!("{:?}", peer_id), - "block" => format!("{}", beacon_block_root) - ); - // we don't know the block, get the sync manager to handle the block lookup - self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root)); - } - AttestationProcessingOutcome::FutureEpoch { .. } - | AttestationProcessingOutcome::PastEpoch { .. } - | AttestationProcessingOutcome::UnknownTargetRoot { .. } - | AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation - AttestationProcessingOutcome::Invalid { .. } - | AttestationProcessingOutcome::EmptyAggregationBitfield { .. } - | AttestationProcessingOutcome::AttestsToFutureBlock { .. } - | AttestationProcessingOutcome::InvalidSignature - | AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { .. } - | AttestationProcessingOutcome::BadTargetEpoch { .. } => { - // the peer has sent a bad attestation. Remove them. - self.network.disconnect(peer_id, GoodbyeReason::Fault); - } - }, - Err(_) => { - // error is logged during the processing therefore no error is logged here - trace!( - self.log, - "Erroneous gossip attestation ssz"; - "ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())), - ); - } - }; - */ - } } /// Build a `StatusMessage` representing the state of the given `beacon_chain`. diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 319d8de53..8c1cd6cae 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,7 +8,7 @@ use crate::{ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::{rpc::RPCRequest, Enr, Libp2pEvent, MessageId, NetworkGlobals, PeerId, Swarm}; -use eth2_libp2p::{PubsubMessage, RPCEvent}; +use eth2_libp2p::{PubsubData, PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; use rest_types::ValidatorSubscription; @@ -55,7 +55,7 @@ pub struct NetworkService { impl NetworkService { pub fn start( beacon_chain: Arc>, - config: &mut NetworkConfig, + config: &NetworkConfig, executor: &TaskExecutor, network_log: slog::Logger, ) -> error::Result<( @@ -77,8 +77,8 @@ impl NetworkService { let propagation_percentage = config.propagation_percentage; - // set the local enr_fork_id - config.enr_fork_id = beacon_chain + // build the current enr_fork_id for adding to our local ENR + let enr_fork_id = beacon_chain .enr_fork_id() .map_err(|e| format!("Could not get the current ENR fork version: {:?}", e))?; @@ -88,7 +88,8 @@ impl NetworkService { .map_err(|e| format!("Could not get the next fork update duration: {:?}", e))?; // launch libp2p service - let (network_globals, mut libp2p) = LibP2PService::new(config, network_log.clone())?; + let (network_globals, mut libp2p) = + LibP2PService::new(config, enr_fork_id, network_log.clone())?; for enr in load_dht::(store.clone()) { libp2p.swarm.add_enr(enr); @@ -262,11 +263,26 @@ fn spawn_service( while let Ok(Async::Ready(Some(attestation_service_message))) = service.attestation_service.poll() { match attestation_service_message { // TODO: Implement - AttServiceMessage::Subscribe(_subnet) => { }, - AttServiceMessage::Unsubscribe(_subnet) => { }, - AttServiceMessage::EnrAdd(_subnet) => { }, - AttServiceMessage::EnrRemove(_subnet) => { }, - AttServiceMessage::DiscoverPeers(_subnet) => { }, + AttServiceMessage::Subscribe(subnet_id) => { + service.libp2p.swarm.subscribe_to_subnet(subnet_id); + }, + AttServiceMessage::Unsubscribe(subnet_id) => { + service.libp2p.swarm.subscribe_to_subnet(subnet_id); + }, + AttServiceMessage::EnrAdd(subnet_id) => { + service.libp2p.swarm.update_enr_subnet(subnet_id, true); + }, + AttServiceMessage::EnrRemove(subnet_id) => { + service.libp2p.swarm.update_enr_subnet(subnet_id, false); + }, + AttServiceMessage::DiscoverPeers(subnet_id) => { + service.libp2p.swarm.peers_request(subnet_id); + }, + AttServiceMessage::Propagate(source, message_id) => { + service.libp2p + .swarm + .propagate_message(&source, message_id); + } } } @@ -276,8 +292,6 @@ fn spawn_service( match service.libp2p.poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { - // trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event)); - // if we received a Goodbye message, drop and ban the peer if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { peers_to_ban.push(peer_id.clone()); @@ -304,9 +318,24 @@ fn spawn_service( message, .. } => { - service.router_send - .try_send(RouterMessage::PubsubMessage(id, source, message)) - .map_err(|_| { debug!(log, "Failed to send pubsub message to router");})?; + + match message.data { + // attestation information gets processed in the attestation service + PubsubData::AggregateAndProofAttestation(signed_aggregate_and_proof) => { + service.attestation_service.handle_aggregate_attestation(id, source, *signed_aggregate_and_proof); + }, + PubsubData::Attestation(subnet_and_attestation) => { + let subnet = subnet_and_attestation.0; + let attestation = subnet_and_attestation.1; + service.attestation_service.handle_unaggregated_attestation(id, source, subnet, attestation); + } + _ => { + // all else is sent to the router + service.router_send + .try_send(RouterMessage::PubsubMessage(id, source, message)) + .map_err(|_| { debug!(log, "Failed to send pubsub message to router");})?; + } + } } Libp2pEvent::PeerSubscribed(_, _) => {} },