From 5a74239ebcf0473120cdfc1acb4bf31fcc338f24 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 8 Aug 2019 14:58:33 +1000 Subject: [PATCH] Add decoding/encoding for extended gossip topics. Correct logging CLI --- beacon_node/Cargo.toml | 4 +- beacon_node/eth2-libp2p/src/config.rs | 2 + beacon_node/network/src/message_handler.rs | 86 ++++++++++++++++++---- beacon_node/src/main.rs | 17 +---- 4 files changed, 79 insertions(+), 30 deletions(-) diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 9124047e4..cba73b8a4 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -11,7 +11,7 @@ store = { path = "./store" } client = { path = "client" } version = { path = "version" } clap = "2.32.0" -slog = { version = "^2.2.3" , features = ["max_level_trace"] } +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_trace"] } slog-term = "^2.4.0" slog-async = "^2.3.0" ctrlc = { version = "3.1.1", features = ["termination"] } @@ -22,3 +22,5 @@ exit-future = "0.1.3" env_logger = "0.6.1" dirs = "2.0.1" logging = { path = "../eth2/utils/logging" } +slog-scope = "4.1.2" +slog-stdlog = "3.0.5" diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index d7648ec3f..7cb501c1f 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -7,6 +7,8 @@ use std::path::PathBuf; use std::time::Duration; /// The gossipsub topic names. +// These constants form a topic name of the form /TOPIC_PREFIX/TOPIC/ENCODING_POSTFIX +// For example /eth2/beacon_block/ssz pub const TOPIC_PREFIX: &str = "eth2"; pub const TOPIC_ENCODING_POSTFIX: &str = "ssz"; pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 72a507ad7..b86dcb969 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -10,11 +10,13 @@ use eth2_libp2p::{ }; use futures::future::Future; use futures::stream::Stream; -use slog::{debug, warn}; +use slog::{debug, trace, warn}; use ssz::{Decode, DecodeError}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Attestation, BeaconBlock, BeaconBlockHeader}; +use types::{ + Attestation, AttesterSlashing, BeaconBlock, BeaconBlockHeader, ProposerSlashing, VoluntaryExit, +}; /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -49,7 +51,7 @@ impl MessageHandler { executor: &tokio::runtime::TaskExecutor, log: slog::Logger, ) -> error::Result> { - debug!(log, "Service starting"); + trace!(log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); @@ -65,7 +67,6 @@ impl MessageHandler { }; // spawn handler task - // TODO: Handle manual termination of thread executor.spawn( handler_recv .for_each(move |msg| Ok(handler.handle_message(msg))) @@ -221,43 +222,79 @@ impl MessageHandler { /// Handle various RPC errors fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { //TODO: Handle error correctly - warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); + warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "request_id" => format!("{}", request_id), "Error" => format!("{:?}", error)); } /// Handle RPC messages fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { match gossip_message { PubsubMessage::Block(message) => match self.decode_gossip_block(message) { - Err(e) => { - debug!(self.log, "Invalid Gossiped Beacon Block"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e)); - } Ok(block) => { let _should_forward_on = self.sync .on_block_gossip(peer_id, block, &mut self.network_context); } + Err(e) => { + debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } }, PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) { - Err(e) => { - debug!(self.log, "Invalid Gossiped Attestation"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e)); - } Ok(attestation) => { self.sync .on_attestation_gossip(peer_id, attestation, &mut self.network_context) } + Err(e) => { + debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } }, + PubsubMessage::VoluntaryExit(message) => match self.decode_gossip_exit(message) { + Ok(_exit) => { + // TODO: Handle exits + debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) ); + } + Err(e) => { + debug!(self.log, "Invalid gossiped exit"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + }, + PubsubMessage::ProposerSlashing(message) => { + match self.decode_gossip_proposer_slashing(message) { + Ok(_slashing) => { + // TODO: Handle proposer slashings + debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) ); + } + Err(e) => { + debug!(self.log, "Invalid gossiped proposer slashing"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + } + } + PubsubMessage::AttesterSlashing(message) => { + match self.decode_gossip_attestation_slashing(message) { + Ok(_slashing) => { + // TODO: Handle attester slashings + debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) ); + } + Err(e) => { + debug!(self.log, "Invalid gossiped attester slashing"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + } + } PubsubMessage::Unknown(message) => { // Received a message from an unknown topic. Ignore for now - debug!(self.log, "Unknown Gossip Message"; "Peer" => format!("{}", peer_id), "Message" => format!("{:?}", message)); + debug!(self.log, "Unknown Gossip Message"; "peer_id" => format!("{}", peer_id), "Message" => format!("{:?}", message)); } } } - /* Decoding of blocks and attestations from the network. + /* Decoding of gossipsub objects from the network. + * + * The decoding is done in the message handler as it has access to to a `BeaconChain` and can + * therefore apply more efficient logic in decoding and verification. * * TODO: Apply efficient decoding/verification of these objects */ + /* Gossipsub Domain Decoding */ + // Note: These are not generics as type-specific verification will need to be applied. fn decode_gossip_block( &self, beacon_block: Vec, @@ -274,6 +311,29 @@ impl MessageHandler { Attestation::from_ssz_bytes(&beacon_block) } + fn decode_gossip_exit(&self, voluntary_exit: Vec) -> Result { + //TODO: Apply verification before decoding. + VoluntaryExit::from_ssz_bytes(&voluntary_exit) + } + + fn decode_gossip_proposer_slashing( + &self, + proposer_slashing: Vec, + ) -> Result { + //TODO: Apply verification before decoding. + ProposerSlashing::from_ssz_bytes(&proposer_slashing) + } + + fn decode_gossip_attestation_slashing( + &self, + attester_slashing: Vec, + ) -> Result, DecodeError> { + //TODO: Apply verification before decoding. + AttesterSlashing::from_ssz_bytes(&attester_slashing) + } + + /* Req/Resp Domain Decoding */ + /// Verifies and decodes the ssz-encoded block bodies received from peers. fn decode_block_bodies( &self, diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index b34259f5a..086ccc5be 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -9,7 +9,6 @@ use std::fs; use std::path::PathBuf; pub const DEFAULT_DATA_DIR: &str = ".lighthouse"; - pub const CLIENT_CONFIG_FILENAME: &str = "beacon-node.toml"; pub const ETH2_CONFIG_FILENAME: &str = "eth2-spec.toml"; pub const TESTNET_CONFIG_FILENAME: &str = "testnet.toml"; @@ -214,14 +213,7 @@ fn main() { .help("The title of the spec constants for chain config.") .takes_value(true) .possible_values(&["info", "debug", "trace", "warn", "error", "crit"]) - .default_value("trace"), - ) - .arg( - Arg::with_name("verbosity") - .short("v") - .multiple(true) - .help("Sets the verbosity level") - .takes_value(true), + .default_value("info"), ) .get_matches(); @@ -241,13 +233,6 @@ fn main() { _ => unreachable!("guarded by clap"), }; - let drain = match matches.occurrences_of("verbosity") { - 0 => drain.filter_level(Level::Info), - 1 => drain.filter_level(Level::Debug), - 2 => drain.filter_level(Level::Trace), - _ => drain.filter_level(Level::Trace), - }; - let mut log = slog::Logger::root(drain.fuse(), o!()); let data_dir = match matches