diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 1a27de406..176625d77 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -1,3 +1,4 @@ +use crate::Eth2Config; use clap::ArgMatches; use http_server::HttpServerConfig; use network::NetworkConfig; @@ -56,8 +57,6 @@ impl Default for Config { log_file: PathBuf::from(""), db_type: "disk".to_string(), db_name: "chain_db".to_string(), - // Note: there are no default bootnodes specified. - // Once bootnodes are established, add them here. network: NetworkConfig::new(), rpc: rpc::RPCConfig::default(), http: HttpServerConfig::default(), @@ -129,6 +128,15 @@ impl Config { self.data_dir = PathBuf::from(dir); }; + if let Some(default_spec) = args.value_of("default-spec") { + match default_spec { + "mainnet" => self.spec_constants = Eth2Config::mainnet().spec_constants, + "minimal" => self.spec_constants = Eth2Config::minimal().spec_constants, + "interop" => self.spec_constants = Eth2Config::interop().spec_constants, + _ => {} // not supported + } + } + if let Some(dir) = args.value_of("db") { self.db_type = dir.to_string(); }; diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 016973480..794b09712 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,8 +7,8 @@ edition = "2018" [dependencies] clap = "2.32.0" #SigP repository -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b" } -enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 37e3419a3..b87f8a061 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -2,47 +2,52 @@ use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, RPC}; use crate::{error, NetworkConfig}; use crate::{Topic, TopicHash}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC}; use futures::prelude::*; use libp2p::{ - core::{ - identity::Keypair, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - }, + core::identity::Keypair, discv5::Discv5Event, gossipsub::{Gossipsub, GossipsubEvent}, + identify::{Identify, IdentifyEvent}, ping::{Ping, PingConfig, PingEvent}, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{o, trace, warn}; -use ssz::{ssz_encode, Decode, DecodeError, Encode}; +use slog::{debug, o, trace}; +use ssz::{ssz_encode, Encode}; use std::num::NonZeroU32; use std::time::Duration; -use types::{Attestation, BeaconBlock, EthSpec}; + +const MAX_IDENTIFY_ADDRESSES: usize = 20; /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] -pub struct Behaviour { +#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] +pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, - /// The serenity RPC specified in the wire-0 protocol. - serenity_rpc: RPC, + /// The Eth2 RPC specified in the wire-0 protocol. + eth2_rpc: RPC, /// Keep regular connection to peers and disconnect if absent. + // TODO: Remove Libp2p ping in favour of discv5 ping. ping: Ping, - /// Kademlia for peer discovery. + // TODO: Using id for initial interop. This will be removed by mainnet. + /// Provides IP addresses and peer information. + identify: Identify, + /// Discovery behaviour. discovery: Discovery, #[behaviour(ignore)] /// The events generated by this behaviour to be consumed in the swarm poll. - events: Vec>, + events: Vec, /// Logger for behaviour actions. #[behaviour(ignore)] log: slog::Logger, } -impl Behaviour { +impl Behaviour { pub fn new( local_key: &Keypair, net_conf: &NetworkConfig, @@ -50,17 +55,25 @@ impl Behaviour { ) -> error::Result { let local_peer_id = local_key.public().clone().into_peer_id(); let behaviour_log = log.new(o!()); + let ping_config = PingConfig::new() .with_timeout(Duration::from_secs(30)) .with_interval(Duration::from_secs(20)) .with_max_failures(NonZeroU32::new(2).expect("2 != 0")) .with_keep_alive(false); + let identify = Identify::new( + "lighthouse/libp2p".into(), + version::version(), + local_key.public(), + ); + Ok(Behaviour { - serenity_rpc: RPC::new(log), + eth2_rpc: RPC::new(log), gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), discovery: Discovery::new(local_key, net_conf, log)?, ping: Ping::new(ping_config), + identify, events: Vec::new(), log: behaviour_log, }) @@ -68,31 +81,20 @@ impl Behaviour { } // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour -impl NetworkBehaviourEventProcess - for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, event: GossipsubEvent) { match event { GossipsubEvent::Message(gs_msg) => { trace!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg)); - let pubsub_message = match PubsubMessage::from_ssz_bytes(&gs_msg.data) { - //TODO: Punish peer on error - Err(e) => { - warn!( - self.log, - "Received undecodable message from Peer {:?} error", gs_msg.source; - "error" => format!("{:?}", e) - ); - return; - } - Ok(msg) => msg, - }; + let msg = PubsubMessage::from_topics(&gs_msg.topics, gs_msg.data); self.events.push(BehaviourEvent::GossipMessage { source: gs_msg.source, topics: gs_msg.topics, - message: Box::new(pubsub_message), + message: msg, }); } GossipsubEvent::Subscribed { .. } => {} @@ -101,8 +103,8 @@ impl NetworkBehaviourEventProces } } -impl NetworkBehaviourEventProcess - for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, event: RPCMessage) { match event { @@ -119,19 +121,19 @@ impl NetworkBehaviourEventProces } } -impl NetworkBehaviourEventProcess - for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, _event: PingEvent) { // not interested in ping responses at the moment. } } -impl Behaviour { +impl Behaviour { /// Consumes the events list when polled. fn poll( &mut self, - ) -> Async>> { + ) -> Async> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); } @@ -140,8 +142,36 @@ impl Behaviour { } } -impl NetworkBehaviourEventProcess - for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, event: IdentifyEvent) { + match event { + IdentifyEvent::Identified { + peer_id, mut info, .. + } => { + if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES { + debug!( + self.log, + "More than 20 addresses have been identified, truncating" + ); + info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES); + } + debug!(self.log, "Identified Peer"; "Peer" => format!("{}", peer_id), + "Protocol Version" => info.protocol_version, + "Agent Version" => info.agent_version, + "Listening Addresses" => format!("{:?}", info.listen_addrs), + "Protocols" => format!("{:?}", info.protocols) + ); + } + IdentifyEvent::Error { .. } => {} + IdentifyEvent::SendBack { .. } => {} + } + } +} + +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, _event: Discv5Event) { // discv5 has no events to inject @@ -149,7 +179,7 @@ impl NetworkBehaviourEventProces } /// Implements the combined behaviour for the libp2p service. -impl Behaviour { +impl Behaviour { /* Pubsub behaviour functions */ /// Subscribes to a gossipsub topic. @@ -158,7 +188,7 @@ impl Behaviour { } /// Publishes a message on the pubsub (gossipsub) behaviour. - pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { + pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { let message_bytes = ssz_encode(&message); for topic in topics { self.gossipsub.publish(topic, message_bytes.clone()); @@ -169,7 +199,7 @@ impl Behaviour { /// Sends an RPC Request/Response via the RPC protocol. pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.serenity_rpc.send_rpc(peer_id, rpc_event); + self.eth2_rpc.send_rpc(peer_id, rpc_event); } /* Discovery / Peer management functions */ @@ -179,99 +209,60 @@ impl Behaviour { } /// The types of events than can be obtained from polling the behaviour. -pub enum BehaviourEvent { +pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), PeerDisconnected(PeerId), GossipMessage { source: PeerId, topics: Vec, - message: Box>, + message: PubsubMessage, }, } /// Messages that are passed to and from the pubsub (Gossipsub) behaviour. #[derive(Debug, Clone, PartialEq)] -pub enum PubsubMessage { +pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. - Block(BeaconBlock), + Block(Vec), /// Gossipsub message providing notification of a new attestation. - Attestation(Attestation), + Attestation(Vec), + /// Gossipsub message from an unknown topic. + Unknown(Vec), } -//TODO: Correctly encode/decode enums. Prefixing with integer for now. -impl Encode for PubsubMessage { +impl PubsubMessage { + /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will + * need to be modified. + * + * Also note that a message can be associated with many topics. As soon as one of the topics is + * known we match. If none of the topics are known we return an unknown state. + */ + fn from_topics(topics: &Vec, data: Vec) -> Self { + for topic in topics { + match topic.as_str() { + BEACON_BLOCK_TOPIC => return PubsubMessage::Block(data), + BEACON_ATTESTATION_TOPIC => return PubsubMessage::Attestation(data), + _ => {} + } + } + PubsubMessage::Unknown(data) + } +} + +impl Encode for PubsubMessage { fn is_ssz_fixed_len() -> bool { false } fn ssz_append(&self, buf: &mut Vec) { - let offset = ::ssz_fixed_len() + as Encode>::ssz_fixed_len(); - - let mut encoder = ssz::SszEncoder::container(buf, offset); - match self { - PubsubMessage::Block(block_gossip) => { - encoder.append(&0_u32); - + PubsubMessage::Block(inner) + | PubsubMessage::Attestation(inner) + | PubsubMessage::Unknown(inner) => { // Encode the gossip as a Vec; - encoder.append(&block_gossip.as_ssz_bytes()); - } - PubsubMessage::Attestation(attestation_gossip) => { - encoder.append(&1_u32); - - // Encode the gossip as a Vec; - encoder.append(&attestation_gossip.as_ssz_bytes()); + buf.append(&mut inner.as_ssz_bytes()); } } - - encoder.finalize(); - } -} - -impl Decode for PubsubMessage { - fn is_ssz_fixed_len() -> bool { - false - } - - fn from_ssz_bytes(bytes: &[u8]) -> Result { - let mut builder = ssz::SszDecoderBuilder::new(&bytes); - - builder.register_type::()?; - builder.register_type::>()?; - - let mut decoder = builder.build()?; - - let id: u32 = decoder.decode_next()?; - let body: Vec = decoder.decode_next()?; - - match id { - 0 => Ok(PubsubMessage::Block(BeaconBlock::from_ssz_bytes(&body)?)), - 1 => Ok(PubsubMessage::Attestation(Attestation::from_ssz_bytes( - &body, - )?)), - _ => Err(DecodeError::BytesInvalid( - "Invalid PubsubMessage id".to_string(), - )), - } - } -} - -#[cfg(test)] -mod test { - use super::*; - use types::*; - - #[test] - fn ssz_encoding() { - let original = PubsubMessage::Block(BeaconBlock::::empty( - &MainnetEthSpec::default_spec(), - )); - - let encoded = ssz_encode(&original); - - let decoded = PubsubMessage::from_ssz_bytes(&encoded).unwrap(); - - assert_eq!(original, decoded); } } diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 7391dba8a..ddf14cc04 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -1,12 +1,13 @@ use clap::ArgMatches; use enr::Enr; use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; +use libp2p::Multiaddr; use serde_derive::{Deserialize, Serialize}; use std::path::PathBuf; use std::time::Duration; /// The beacon node topic string to subscribe to. -pub const BEACON_PUBSUB_TOPIC: &str = "beacon_block"; +pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_ATTESTATION_TOPIC: &str = "beacon_attestation"; pub const SHARD_TOPIC_PREFIX: &str = "shard"; @@ -39,6 +40,9 @@ pub struct Config { /// List of nodes to initially connect to. pub boot_nodes: Vec, + /// List of libp2p nodes to initially connect to. + pub libp2p_nodes: Vec, + /// Client version pub client_version: String, @@ -60,12 +64,13 @@ impl Default for Config { discovery_port: 9000, max_peers: 10, //TODO: Set realistic values for production + // Note: This defaults topics to plain strings. Not hashes gs_config: GossipsubConfigBuilder::new() - .max_gossip_size(4_000_000) - .inactivity_timeout(Duration::from_secs(90)) + .max_transmit_size(1_000_000) .heartbeat_interval(Duration::from_secs(20)) .build(), boot_nodes: vec![], + libp2p_nodes: vec![], client_version: version::version(), topics: Vec::new(), } @@ -118,6 +123,21 @@ impl Config { .collect::, _>>()?; } + if let Some(libp2p_addresses_str) = args.value_of("libp2p-addresses") { + self.libp2p_nodes = libp2p_addresses_str + .split(',') + .map(|multiaddr| { + multiaddr + .parse() + .map_err(|_| format!("Invalid Multiaddr: {}", multiaddr)) + }) + .collect::, _>>()?; + } + + if let Some(topics_str) = args.value_of("topics") { + self.topics = topics_str.split(',').map(|s| s.into()).collect(); + } + if let Some(discovery_address_str) = args.value_of("discovery-address") { self.discovery_address = discovery_address_str .parse() diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index c2f008756..4c1794945 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -4,13 +4,11 @@ use crate::{error, NetworkConfig}; /// Currently using discv5 for peer discovery. /// use futures::prelude::*; -use libp2p::core::swarm::{ - ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, -}; -use libp2p::core::{identity::Keypair, Multiaddr, PeerId, ProtocolsHandler}; +use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId}; use libp2p::discv5::{Discv5, Discv5Event}; use libp2p::enr::{Enr, EnrBuilder, NodeId}; use libp2p::multiaddr::Protocol; +use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use slog::{debug, info, o, warn}; use std::collections::HashSet; use std::fs::File; @@ -37,6 +35,9 @@ pub struct Discovery { /// The target number of connected peers on the libp2p interface. max_peers: usize, + /// directory to save ENR to + enr_dir: String, + /// The delay between peer discovery searches. peer_discovery_delay: Delay, @@ -54,9 +55,6 @@ pub struct Discovery { /// Logger for the discovery behaviour. log: slog::Logger, - - /// directory to save ENR to - enr_dir: String, } impl Discovery { diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 7a3b2e632..54a4f2a99 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -11,9 +11,9 @@ mod service; pub use behaviour::PubsubMessage; pub use config::{ - Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX, + Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, SHARD_TOPIC_PREFIX, }; -pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; +pub use libp2p::gossipsub::{Topic, TopicHash}; pub use libp2p::multiaddr; pub use libp2p::Multiaddr; pub use libp2p::{ diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 4e796f6fb..dbc32c5a4 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -5,23 +5,21 @@ use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use core::marker::PhantomData; use fnv::FnvHashMap; use futures::prelude::*; -use libp2p::core::protocols_handler::{ +use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; +use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; -use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; use smallvec::SmallVec; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; -use types::EthSpec; /// The time (in seconds) before a substream that is awaiting a response times out. pub const RESPONSE_TIMEOUT: u64 = 9; /// Implementation of `ProtocolsHandler` for the RPC protocol. -pub struct RPCHandler +pub struct RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, @@ -56,8 +54,8 @@ where /// After the given duration has elapsed, an inactive connection will shutdown. inactive_timeout: Duration, - /// Phantom EthSpec. - _phantom: PhantomData, + /// Marker to pin the generic stream. + _phantom: PhantomData, } /// An outbound substream is waiting a response from the user. @@ -90,10 +88,9 @@ where }, } -impl RPCHandler +impl RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { pub fn new( listen_protocol: SubstreamProtocol, @@ -145,20 +142,18 @@ where } } -impl Default for RPCHandler +impl Default for RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { fn default() -> Self { RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30)) } } -impl ProtocolsHandler for RPCHandler +impl ProtocolsHandler for RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { type InEvent = RPCEvent; type OutEvent = RPCEvent; @@ -273,7 +268,11 @@ where Self::Error, > { if let Some(err) = self.pending_error.take() { - return Err(err); + // Returning an error here will result in dropping any peer that doesn't support any of + // the RPC protocols. For our immediate purposes we permit this and simply log that an + // upgrade was not supported. + // TODO: Add a logger to the handler for trace output. + dbg!(&err); } // return any events that need to be reported diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 88060e602..756a62e71 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -6,9 +6,9 @@ use futures::prelude::*; use handler::RPCHandler; -use libp2p::core::protocols_handler::ProtocolsHandler; -use libp2p::core::swarm::{ - ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +use libp2p::core::ConnectedPoint; +use libp2p::swarm::{ + protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::{Multiaddr, PeerId}; pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId}; @@ -16,7 +16,6 @@ pub use protocol::{RPCError, RPCProtocol, RPCRequest}; use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; -use types::EthSpec; pub(crate) mod codec; mod handler; @@ -50,16 +49,16 @@ impl RPCEvent { /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. -pub struct RPC { +pub struct RPC { /// Queue of events to processed. events: Vec>, /// Pins the generic substream. - marker: PhantomData<(TSubstream, E)>, + marker: PhantomData<(TSubstream)>, /// Slog logger for RPC behaviour. _log: slog::Logger, } -impl RPC { +impl RPC { pub fn new(log: &slog::Logger) -> Self { let log = log.new(o!("Service" => "Libp2p-RPC")); RPC { @@ -80,12 +79,11 @@ impl RPC { } } -impl NetworkBehaviour for RPC +impl NetworkBehaviour for RPC where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { - type ProtocolsHandler = RPCHandler; + type ProtocolsHandler = RPCHandler; type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 8729de3a7..b606fc743 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -8,7 +8,7 @@ use futures::{ future::{self, FutureResult}, sink, stream, Sink, Stream, }; -use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use std::io; use std::time::Duration; use tokio::codec::Framed; @@ -28,24 +28,22 @@ const REQUEST_TIMEOUT: u64 = 3; pub struct RPCProtocol; impl UpgradeInfo for RPCProtocol { - type Info = RawProtocolId; + type Info = ProtocolId; type InfoIter = Vec; fn protocol_info(&self) -> Self::InfoIter { vec![ - ProtocolId::new("hello", "1.0.0", "ssz").into(), - ProtocolId::new("goodbye", "1.0.0", "ssz").into(), - ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into(), - ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into(), - ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into(), + ProtocolId::new("hello", "1.0.0", "ssz"), + ProtocolId::new("goodbye", "1.0.0", "ssz"), + ProtocolId::new("beacon_block_roots", "1.0.0", "ssz"), + ProtocolId::new("beacon_block_headers", "1.0.0", "ssz"), + ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz"), ] } } -/// The raw protocol id sent over the wire. -type RawProtocolId = Vec; - /// Tracks the types in a protocol id. +#[derive(Clone)] pub struct ProtocolId { /// The rpc message type/name. pub message_name: String, @@ -55,44 +53,31 @@ pub struct ProtocolId { /// The encoding of the RPC. pub encoding: String, + + /// The protocol id that is formed from the above fields. + protocol_id: String, } /// An RPC protocol ID. impl ProtocolId { pub fn new(message_name: &str, version: &str, encoding: &str) -> Self { + let protocol_id = format!( + "{}/{}/{}/{}", + PROTOCOL_PREFIX, message_name, version, encoding + ); + ProtocolId { message_name: message_name.into(), version: version.into(), encoding: encoding.into(), + protocol_id, } } - - /// Converts a raw RPC protocol id string into an `RPCProtocolId` - pub fn from_bytes(bytes: &[u8]) -> Result { - let protocol_string = String::from_utf8(bytes.to_vec()) - .map_err(|_| RPCError::InvalidProtocol("Invalid protocol Id"))?; - let protocol_list: Vec<&str> = protocol_string.as_str().split('/').take(7).collect(); - - if protocol_list.len() != 7 { - return Err(RPCError::InvalidProtocol("Not enough '/'")); - } - - Ok(ProtocolId { - message_name: protocol_list[4].into(), - version: protocol_list[5].into(), - encoding: protocol_list[6].into(), - }) - } } -impl Into for ProtocolId { - fn into(self) -> RawProtocolId { - format!( - "{}/{}/{}/{}", - PROTOCOL_PREFIX, self.message_name, self.version, self.encoding - ) - .as_bytes() - .to_vec() +impl ProtocolName for ProtocolId { + fn protocol_name(&self) -> &[u8] { + self.protocol_id.as_bytes() } } @@ -127,16 +112,11 @@ where fn upgrade_inbound( self, socket: upgrade::Negotiated, - protocol: RawProtocolId, + protocol: ProtocolId, ) -> Self::Future { - // TODO: Verify this - let protocol_id = - ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols"); - - match protocol_id.encoding.as_str() { + match protocol.encoding.as_str() { "ssz" | _ => { - let ssz_codec = - BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, MAX_RPC_SIZE)); + let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); let codec = InboundCodec::SSZ(ssz_codec); Framed::new(socket, codec) .into_future() @@ -171,7 +151,7 @@ pub enum RPCRequest { } impl UpgradeInfo for RPCRequest { - type Info = RawProtocolId; + type Info = ProtocolId; type InfoIter = Vec; // add further protocols as we support more encodings/versions @@ -182,22 +162,25 @@ impl UpgradeInfo for RPCRequest { /// Implements the encoding per supported protocol for RPCRequest. impl RPCRequest { - pub fn supported_protocols(&self) -> Vec { + pub fn supported_protocols(&self) -> Vec { match self { // add more protocols when versions/encodings are supported - RPCRequest::Hello(_) => vec![ProtocolId::new("hello", "1.0.0", "ssz").into()], - RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz").into()], + RPCRequest::Hello(_) => vec![ + ProtocolId::new("hello", "1.0.0", "ssz"), + ProtocolId::new("goodbye", "1.0.0", "ssz"), + ], + RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz")], RPCRequest::BeaconBlockRoots(_) => { - vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into()] + vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz")] } RPCRequest::BeaconBlockHeaders(_) => { - vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into()] + vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz")] } RPCRequest::BeaconBlockBodies(_) => { - vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into()] + vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz")] } RPCRequest::BeaconChainState(_) => { - vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz").into()] + vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz")] } } } @@ -230,12 +213,9 @@ where socket: upgrade::Negotiated, protocol: Self::Info, ) -> Self::Future { - let protocol_id = - ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols"); - - match protocol_id.encoding.as_str() { + match protocol.encoding.as_str() { "ssz" | _ => { - let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol_id, 4096)); + let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, 4096)); let codec = OutboundCodec::SSZ(ssz_codec); Framed::new(socket, codec).send(self) } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 05ae9e473..316aa0579 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -3,8 +3,8 @@ use crate::error; use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; use crate::NetworkConfig; -use crate::{TopicBuilder, TopicHash}; -use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC}; +use crate::{Topic, TopicHash}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ @@ -21,25 +21,24 @@ use std::fs::File; use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::time::Duration; -use types::EthSpec; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; -type Libp2pBehaviour = Behaviour, E>; +type Libp2pBehaviour = Behaviour>; const NETWORK_KEY_FILENAME: &str = "key"; /// The configuration and state of the libp2p components for the beacon node. -pub struct Service { +pub struct Service { /// The libp2p Swarm handler. //TODO: Make this private - pub swarm: Swarm>, + pub swarm: Swarm, /// This node's PeerId. _local_peer_id: PeerId, /// The libp2p logger handle. pub log: slog::Logger, } -impl Service { +impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { debug!(log, "Network-libp2p Service starting"); @@ -76,18 +75,35 @@ impl Service { ), }; + // attempt to connect to user-input libp2p nodes + for multiaddr in config.libp2p_nodes { + match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { + Ok(()) => debug!(log, "Dialing libp2p node: {}", multiaddr), + Err(err) => debug!( + log, + "Could not connect to node: {} error: {:?}", multiaddr, err + ), + }; + } + // subscribe to default gossipsub topics let mut topics = vec![]; //TODO: Handle multiple shard attestations. For now we simply use a separate topic for - //attestations - topics.push(BEACON_ATTESTATION_TOPIC.to_string()); - topics.push(BEACON_PUBSUB_TOPIC.to_string()); - topics.append(&mut config.topics.clone()); + // attestations + topics.push(Topic::new(BEACON_ATTESTATION_TOPIC.into())); + topics.push(Topic::new(BEACON_BLOCK_TOPIC.into())); + topics.append( + &mut config + .topics + .iter() + .cloned() + .map(|s| Topic::new(s)) + .collect(), + ); let mut subscribed_topics = vec![]; for topic in topics { - let t = TopicBuilder::new(topic.clone()).build(); - if swarm.subscribe(t) { + if swarm.subscribe(topic.clone()) { trace!(log, "Subscribed to topic: {:?}", topic); subscribed_topics.push(topic); } else { @@ -104,8 +120,8 @@ impl Service { } } -impl Stream for Service { - type Item = Libp2pEvent; +impl Stream for Service { + type Item = Libp2pEvent; type Error = crate::error::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -119,7 +135,7 @@ impl Stream for Service { topics, message, } => { - trace!(self.log, "Pubsub message received: {:?}", message); + trace!(self.log, "Gossipsub message received"; "Message" => format!("{:?}", message)); return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { source, topics, @@ -179,7 +195,7 @@ fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox) } /// Events that can be obtained from polling the Libp2p Service. -pub enum Libp2pEvent { +pub enum Libp2pEvent { /// An RPC response request has been received on the swarm. RPC(PeerId, RPCEvent), /// Initiated the connection to a new peer. @@ -190,7 +206,7 @@ pub enum Libp2pEvent { PubsubMessage { source: PeerId, topics: Vec, - message: Box>, + message: PubsubMessage, }, } diff --git a/beacon_node/http_server/src/api.rs b/beacon_node/http_server/src/api.rs index a91080899..8cb023b02 100644 --- a/beacon_node/http_server/src/api.rs +++ b/beacon_node/http_server/src/api.rs @@ -64,7 +64,7 @@ fn handle_fork(req: &mut Request) -> IronResult( pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, - _network_chan: mpsc::UnboundedSender>, + _network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, db_path: PathBuf, metrics_registry: Registry, diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index eaddce533..72a507ad7 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -14,7 +14,7 @@ use slog::{debug, warn}; use ssz::{Decode, DecodeError}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{BeaconBlockHeader, EthSpec}; +use types::{Attestation, BeaconBlock, BeaconBlockHeader}; /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -23,14 +23,14 @@ pub struct MessageHandler { /// The syncing framework. sync: SimpleSync, /// The context required to send messages to, and process messages from peers. - network_context: NetworkContext, + network_context: NetworkContext, /// The `MessageHandler` logger. log: slog::Logger, } /// Types of messages the handler can receive. #[derive(Debug)] -pub enum HandlerMessage { +pub enum HandlerMessage { /// We have initiated a connection to a new peer. PeerDialed(PeerId), /// Peer has disconnected, @@ -38,17 +38,17 @@ pub enum HandlerMessage { /// An RPC response/request has been received. RPC(PeerId, RPCEvent), /// A gossip message has been received. - PubsubMessage(PeerId, Box>), + PubsubMessage(PeerId, PubsubMessage), } impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn spawn( beacon_chain: Arc>, - network_send: mpsc::UnboundedSender>, + network_send: mpsc::UnboundedSender, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, - ) -> error::Result>> { + ) -> error::Result> { debug!(log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); @@ -78,7 +78,7 @@ impl MessageHandler { } /// Handle all messages incoming from the network service. - fn handle_message(&mut self, message: HandlerMessage) { + fn handle_message(&mut self, message: HandlerMessage) { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { @@ -94,7 +94,7 @@ impl MessageHandler { } // we have received an RPC message request/response HandlerMessage::PubsubMessage(peer_id, gossip) => { - self.handle_gossip(peer_id, *gossip); + self.handle_gossip(peer_id, gossip); } } } @@ -218,6 +218,62 @@ impl MessageHandler { } } + /// Handle various RPC errors + fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { + //TODO: Handle error correctly + warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); + } + + /// Handle RPC messages + fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { + match gossip_message { + PubsubMessage::Block(message) => match self.decode_gossip_block(message) { + Err(e) => { + debug!(self.log, "Invalid Gossiped Beacon Block"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + Ok(block) => { + let _should_forward_on = + self.sync + .on_block_gossip(peer_id, block, &mut self.network_context); + } + }, + PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) { + Err(e) => { + debug!(self.log, "Invalid Gossiped Attestation"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + Ok(attestation) => { + self.sync + .on_attestation_gossip(peer_id, attestation, &mut self.network_context) + } + }, + PubsubMessage::Unknown(message) => { + // Received a message from an unknown topic. Ignore for now + debug!(self.log, "Unknown Gossip Message"; "Peer" => format!("{}", peer_id), "Message" => format!("{:?}", message)); + } + } + } + + /* Decoding of blocks and attestations from the network. + * + * TODO: Apply efficient decoding/verification of these objects + */ + + fn decode_gossip_block( + &self, + beacon_block: Vec, + ) -> Result, DecodeError> { + //TODO: Apply verification before decoding. + BeaconBlock::from_ssz_bytes(&beacon_block) + } + + fn decode_gossip_attestation( + &self, + beacon_block: Vec, + ) -> Result, DecodeError> { + //TODO: Apply verification before decoding. + Attestation::from_ssz_bytes(&beacon_block) + } + /// Verifies and decodes the ssz-encoded block bodies received from peers. fn decode_block_bodies( &self, @@ -241,39 +297,18 @@ impl MessageHandler { //TODO: Implement faster header verification before decoding entirely Vec::from_ssz_bytes(&headers_response.headers) } - - /// Handle various RPC errors - fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { - //TODO: Handle error correctly - warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); - } - - /// Handle RPC messages - fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { - match gossip_message { - PubsubMessage::Block(message) => { - let _should_forward_on = - self.sync - .on_block_gossip(peer_id, message, &mut self.network_context); - } - PubsubMessage::Attestation(message) => { - self.sync - .on_attestation_gossip(peer_id, message, &mut self.network_context) - } - } - } } // TODO: RPC Rewrite makes this struct fairly pointless -pub struct NetworkContext { +pub struct NetworkContext { /// The network channel to relay messages to the Network service. - network_send: mpsc::UnboundedSender>, + network_send: mpsc::UnboundedSender, /// The `MessageHandler` logger. log: slog::Logger, } -impl NetworkContext { - pub fn new(network_send: mpsc::UnboundedSender>, log: slog::Logger) -> Self { +impl NetworkContext { + pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { Self { network_send, log } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7a21f7f28..e5ca2a917 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,13 +14,12 @@ use slog::{debug, info, o, trace}; use std::sync::Arc; use tokio::runtime::TaskExecutor; use tokio::sync::{mpsc, oneshot}; -use types::EthSpec; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { - libp2p_service: Arc>>, + libp2p_service: Arc>, _libp2p_exit: oneshot::Sender<()>, - _network_send: mpsc::UnboundedSender>, + _network_send: mpsc::UnboundedSender, _phantom: PhantomData, //message_handler: MessageHandler, //message_handler_send: Sender } @@ -31,9 +30,9 @@ impl Service { config: &NetworkConfig, executor: &TaskExecutor, log: slog::Logger, - ) -> error::Result<(Arc, mpsc::UnboundedSender>)> { + ) -> error::Result<(Arc, mpsc::UnboundedSender)> { // build the network channel - let (network_send, network_recv) = mpsc::unbounded_channel::>(); + let (network_send, network_recv) = mpsc::unbounded_channel::(); // launch message handler thread let message_handler_log = log.new(o!("Service" => "MessageHandler")); let message_handler_send = MessageHandler::spawn( @@ -65,15 +64,15 @@ impl Service { Ok((Arc::new(network_service), network_send)) } - pub fn libp2p_service(&self) -> Arc>> { + pub fn libp2p_service(&self) -> Arc> { self.libp2p_service.clone() } } -fn spawn_service( - libp2p_service: Arc>>, - network_recv: mpsc::UnboundedReceiver>, - message_handler_send: mpsc::UnboundedSender>, +fn spawn_service( + libp2p_service: Arc>, + network_recv: mpsc::UnboundedReceiver, + message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, log: slog::Logger, ) -> error::Result> { @@ -99,10 +98,10 @@ fn spawn_service( } //TODO: Potentially handle channel errors -fn network_service( - libp2p_service: Arc>>, - mut network_recv: mpsc::UnboundedReceiver>, - mut message_handler_send: mpsc::UnboundedSender>, +fn network_service( + libp2p_service: Arc>, + mut network_recv: mpsc::UnboundedReceiver, + mut message_handler_send: mpsc::UnboundedSender, log: slog::Logger, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { @@ -119,7 +118,7 @@ fn network_service( }, NetworkMessage::Publish { topics, message } => { debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); - libp2p_service.lock().swarm.publish(topics, *message); + libp2p_service.lock().swarm.publish(topics, message); } }, Ok(Async::NotReady) => break, @@ -176,14 +175,14 @@ fn network_service( /// Types of messages that the network service can receive. #[derive(Debug)] -pub enum NetworkMessage { +pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), /// Publish a message to pubsub mechanism. Publish { topics: Vec, - message: Box>, + message: PubsubMessage, }, } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 215e37e7f..c3271888a 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -123,7 +123,7 @@ impl SimpleSync { /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. - pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { + pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id)); network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); @@ -137,7 +137,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); @@ -156,7 +156,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); @@ -171,7 +171,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); @@ -278,7 +278,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockRootsRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let state = &self.chain.head().beacon_state; @@ -325,7 +325,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, res: BeaconBlockRootsResponse, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -389,7 +389,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockHeadersRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let state = &self.chain.head().beacon_state; @@ -441,7 +441,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, headers: Vec, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -473,7 +473,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockBodiesRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let block_bodies: Vec> = req .block_roots @@ -519,7 +519,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, res: DecodedBeaconBlockBodiesResponse, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -558,7 +558,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block: BeaconBlock, - network: &mut NetworkContext, + network: &mut NetworkContext, ) -> bool { if let Some(outcome) = self.process_block(peer_id.clone(), block.clone(), network, &"gossip") @@ -628,7 +628,7 @@ impl SimpleSync { &mut self, _peer_id: PeerId, msg: Attestation, - _network: &mut NetworkContext, + _network: &mut NetworkContext, ) { match self.chain.process_attestation(msg) { Ok(()) => info!(self.log, "ImportedAttestation"; "source" => "gossip"), @@ -643,7 +643,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockRootsRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { // Potentially set state to sync. if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE { @@ -667,7 +667,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockHeadersRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -684,7 +684,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockBodiesRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -720,7 +720,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block_root: Hash256, - network: &mut NetworkContext, + network: &mut NetworkContext, source: &str, ) -> Option { match self.import_queue.attempt_complete_block(block_root) { @@ -813,7 +813,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block: BeaconBlock, - network: &mut NetworkContext, + network: &mut NetworkContext, source: &str, ) -> Option { let processing_result = self.chain.process_block(block.clone()); @@ -915,9 +915,9 @@ fn hello_message(beacon_chain: &BeaconChain) -> HelloMes let state = &beacon_chain.head().beacon_state; HelloMessage { - //TODO: Correctly define the chain/network id - network_id: spec.chain_id, - chain_id: u64::from(spec.chain_id), + network_id: spec.network_id, + //TODO: Correctly define the chain id + chain_id: spec.network_id as u64, latest_finalized_root: state.finalized_checkpoint.root, latest_finalized_epoch: state.finalized_checkpoint.epoch, best_root: beacon_chain.head().beacon_block_root, diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 20425d292..1f507f0f3 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,6 +1,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PubsubMessage; -use eth2_libp2p::TopicBuilder; +use eth2_libp2p::Topic; use eth2_libp2p::BEACON_ATTESTATION_TOPIC; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; @@ -11,7 +11,7 @@ use protos::services::{ }; use protos::services_grpc::AttestationService; use slog::{error, info, trace, warn}; -use ssz::{ssz_encode, Decode}; +use ssz::{ssz_encode, Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc; use types::Attestation; @@ -19,7 +19,7 @@ use types::Attestation; #[derive(Clone)] pub struct AttestationServiceInstance { pub chain: Arc>, - pub network_chan: mpsc::UnboundedSender>, + pub network_chan: mpsc::UnboundedSender, pub log: slog::Logger, } @@ -144,13 +144,13 @@ impl AttestationService for AttestationServiceInstance { ); // valid attestation, propagate to the network - let topic = TopicBuilder::new(BEACON_ATTESTATION_TOPIC).build(); - let message = PubsubMessage::Attestation(attestation); + let topic = Topic::new(BEACON_ATTESTATION_TOPIC.into()); + let message = PubsubMessage::Attestation(attestation.as_ssz_bytes()); self.network_chan .try_send(NetworkMessage::Publish { topics: vec![topic], - message: Box::new(message), + message: message, }) .unwrap_or_else(|e| { error!( diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index b42bbb208..b1a67399e 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,6 +1,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use eth2_libp2p::BEACON_PUBSUB_TOPIC; -use eth2_libp2p::{PubsubMessage, TopicBuilder}; +use eth2_libp2p::BEACON_BLOCK_TOPIC; +use eth2_libp2p::{PubsubMessage, Topic}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -11,7 +11,7 @@ use protos::services::{ use protos::services_grpc::BeaconBlockService; use slog::Logger; use slog::{error, info, trace, warn}; -use ssz::{ssz_encode, Decode}; +use ssz::{ssz_encode, Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc; use types::{BeaconBlock, Signature, Slot}; @@ -19,7 +19,7 @@ use types::{BeaconBlock, Signature, Slot}; #[derive(Clone)] pub struct BeaconBlockServiceInstance { pub chain: Arc>, - pub network_chan: mpsc::UnboundedSender>, + pub network_chan: mpsc::UnboundedSender, pub log: Logger, } @@ -106,14 +106,14 @@ impl BeaconBlockService for BeaconBlockServiceInstance { ); // get the network topic to send on - let topic = TopicBuilder::new(BEACON_PUBSUB_TOPIC).build(); - let message = PubsubMessage::Block(block); + let topic = Topic::new(BEACON_BLOCK_TOPIC.into()); + let message = PubsubMessage::Block(block.as_ssz_bytes()); // Publish the block to the p2p network via gossipsub. self.network_chan .try_send(NetworkMessage::Publish { topics: vec![topic], - message: Box::new(message), + message: message, }) .unwrap_or_else(|e| { error!( diff --git a/beacon_node/rpc/src/beacon_node.rs b/beacon_node/rpc/src/beacon_node.rs index 631601ac9..5d635c9d1 100644 --- a/beacon_node/rpc/src/beacon_node.rs +++ b/beacon_node/rpc/src/beacon_node.rs @@ -37,7 +37,7 @@ impl BeaconNodeService for BeaconNodeServiceInstance { node_info.set_fork(fork); node_info.set_genesis_time(genesis_time); node_info.set_genesis_slot(spec.genesis_slot.as_u64()); - node_info.set_chain_id(u32::from(spec.chain_id)); + node_info.set_network_id(u32::from(spec.network_id)); // send the node_info the requester let error_log = self.log.clone(); diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index de9039505..eef009292 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc; pub fn start_server( config: &RPCConfig, executor: &TaskExecutor, - network_chan: mpsc::UnboundedSender>, + network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, log: &slog::Logger, ) -> exit_future::Signal { diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index dd0c695b4..b34259f5a 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -4,7 +4,7 @@ use clap::{App, Arg}; use client::{ClientConfig, Eth2Config}; use env_logger::{Builder, Env}; use eth2_config::{read_from_file, write_to_file}; -use slog::{crit, o, Drain, Level}; +use slog::{crit, o, warn, Drain, Level}; use std::fs; use std::path::PathBuf; @@ -52,10 +52,17 @@ fn main() { .arg( Arg::with_name("listen-address") .long("listen-address") - .value_name("Address") + .value_name("ADDRESS") .help("The address lighthouse will listen for UDP and TCP connections. (default 127.0.0.1).") .takes_value(true), ) + .arg( + Arg::with_name("port") + .long("port") + .value_name("PORT") + .help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.") + .takes_value(true), + ) .arg( Arg::with_name("maxpeers") .long("maxpeers") @@ -70,27 +77,34 @@ fn main() { .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.") .takes_value(true), ) - .arg( - Arg::with_name("port") - .long("port") - .value_name("Lighthouse Port") - .help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.") - .takes_value(true), - ) .arg( Arg::with_name("discovery-port") .long("disc-port") - .value_name("DiscoveryPort") + .value_name("PORT") .help("The discovery UDP port.") .takes_value(true), ) .arg( Arg::with_name("discovery-address") .long("discovery-address") - .value_name("Address") + .value_name("ADDRESS") .help("The IP address to broadcast to other peers on how to reach this node.") .takes_value(true), ) + .arg( + Arg::with_name("topics") + .long("topics") + .value_name("STRING") + .help("One or more comma-delimited gossipsub topic strings to subscribe to.") + .takes_value(true), + ) + .arg( + Arg::with_name("libp2p-addresses") + .long("libp2p-addresses") + .value_name("MULTIADDR") + .help("One or more comma-delimited multiaddrs to manually connect to a libp2p peer without an ENR.") + .takes_value(true), + ) /* * gRPC parameters. */ @@ -136,6 +150,7 @@ fn main() { .help("Listen port for the HTTP server.") .takes_value(true), ) + /* Client related arguments */ .arg( Arg::with_name("api") .long("api") @@ -178,12 +193,9 @@ fn main() { .long("default-spec") .value_name("TITLE") .short("default-spec") - .help("Specifies the default eth2 spec to be used. Overridden by any spec loaded - from disk. A spec will be written to disk after this flag is used, so it is - primarily used for creating eth2 spec files.") + .help("Specifies the default eth2 spec to be used. This will override any spec written to disk and will therefore be used by default in future instances.") .takes_value(true) - .possible_values(&["mainnet", "minimal"]) - .default_value("minimal"), + .possible_values(&["mainnet", "minimal", "interop"]) ) .arg( Arg::with_name("recent-genesis") @@ -202,7 +214,7 @@ fn main() { .help("The title of the spec constants for chain config.") .takes_value(true) .possible_values(&["info", "debug", "trace", "warn", "error", "crit"]) - .default_value("info"), + .default_value("trace"), ) .arg( Arg::with_name("verbosity") @@ -301,29 +313,58 @@ fn main() { let eth2_config_path = data_dir.join(ETH2_CONFIG_FILENAME); - // Attempt to load the `Eth2Config` from file. + // Initialise the `Eth2Config`. // - // If the file doesn't exist, create a default one depending on the CLI flags. - let mut eth2_config = match read_from_file::(eth2_config_path.clone()) { - Ok(Some(c)) => c, - Ok(None) => { - let default = match matches.value_of("default-spec") { - Some("mainnet") => Eth2Config::mainnet(), - Some("minimal") => Eth2Config::minimal(), - _ => unreachable!(), // Guarded by slog. - }; - if let Err(e) = write_to_file(eth2_config_path, &default) { - crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); - return; - } - default - } + // If a CLI parameter is set, overwrite any config file present. + // If a parameter is not set, use either the config file present or default to minimal. + let cli_config = match matches.value_of("default-spec") { + Some("mainnet") => Some(Eth2Config::mainnet()), + Some("minimal") => Some(Eth2Config::minimal()), + Some("interop") => Some(Eth2Config::interop()), + _ => None, + }; + // if a CLI flag is specified, write the new config if it doesn't exist, + // otherwise notify the user that the file will not be written. + let eth2_config_from_file = match read_from_file::(eth2_config_path.clone()) { + Ok(config) => config, Err(e) => { - crit!(log, "Failed to load/generate an Eth2Config"; "error" => format!("{:?}", e)); + crit!(log, "Failed to read the Eth2Config from file"; "error" => format!("{:?}", e)); return; } }; + let mut eth2_config = { + if let Some(cli_config) = cli_config { + if eth2_config_from_file.is_none() { + // write to file if one doesn't exist + if let Err(e) = write_to_file(eth2_config_path, &cli_config) { + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + return; + } + } else { + warn!( + log, + "Eth2Config file exists. Configuration file is ignored, using default" + ); + } + cli_config + } else { + // CLI config not specified, read from disk + match eth2_config_from_file { + Some(config) => config, + None => { + // set default to minimal + let eth2_config = Eth2Config::minimal(); + if let Err(e) = write_to_file(eth2_config_path, ð2_config) { + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + return; + } + eth2_config + } + } + } + }; + // Update the eth2 config with any CLI flags. match eth2_config.apply_cli_args(&matches) { Ok(()) => (), @@ -333,6 +374,12 @@ fn main() { } }; + // check to ensure the spec constants between the client and eth2_config match + if eth2_config.spec_constants != client_config.spec_constants { + crit!(log, "Specification constants do not match."; "client_config" => format!("{}", client_config.spec_constants), "eth2_config" => format!("{}", eth2_config.spec_constants)); + return; + } + // Start the node using a `tokio` executor. match run::run_beacon_node(client_config, eth2_config, &log) { Ok(_) => {} diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index 010993988..c16d23e5f 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -13,7 +13,7 @@ use tokio::runtime::Builder; use tokio::runtime::Runtime; use tokio::runtime::TaskExecutor; use tokio_timer::clock::Clock; -use types::{MainnetEthSpec, MinimalEthSpec}; +use types::{InteropEthSpec, MainnetEthSpec, MinimalEthSpec}; /// Reads the configuration and initializes a `BeaconChain` with the required types and parameters. /// @@ -90,6 +90,22 @@ pub fn run_beacon_node( runtime, log, ), + ("disk", "interop") => run::>( + &db_path, + client_config, + eth2_config, + executor, + runtime, + log, + ), + ("memory", "interop") => run::>( + &db_path, + client_config, + eth2_config, + executor, + runtime, + log, + ), (db_type, spec) => { error!(log, "Unknown runtime configuration"; "spec_constants" => spec, "db_type" => db_type); Err("Unknown specification and/or db_type.".into()) diff --git a/eth2/types/src/beacon_state/beacon_state_types.rs b/eth2/types/src/beacon_state/beacon_state_types.rs index 1dc34e195..0e76942dd 100644 --- a/eth2/types/src/beacon_state/beacon_state_types.rs +++ b/eth2/types/src/beacon_state/beacon_state_types.rs @@ -200,3 +200,37 @@ impl EthSpec for MinimalEthSpec { } pub type MinimalBeaconState = BeaconState; + +/// Interop testnet spec +#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)] +pub struct InteropEthSpec; + +impl EthSpec for InteropEthSpec { + type ShardCount = U8; + type SlotsPerEpoch = U8; + type SlotsPerHistoricalRoot = U64; + type SlotsPerEth1VotingPeriod = U16; + type EpochsPerHistoricalVector = U64; + type EpochsPerSlashingsVector = U64; + type MaxPendingAttestations = U1024; // 128 max attestations * 8 slots per epoch + + params_from_eth_spec!(MainnetEthSpec { + JustificationBitsLength, + MaxValidatorsPerCommittee, + GenesisEpoch, + HistoricalRootsLimit, + ValidatorRegistryLimit, + MaxProposerSlashings, + MaxAttesterSlashings, + MaxAttestations, + MaxDeposits, + MaxVoluntaryExits, + MaxTransfers + }); + + fn default_spec() -> ChainSpec { + ChainSpec::interop() + } +} + +pub type InteropBeaconState = BeaconState; diff --git a/eth2/types/src/chain_spec.rs b/eth2/types/src/chain_spec.rs index 2128c6ef1..9dec626d4 100644 --- a/eth2/types/src/chain_spec.rs +++ b/eth2/types/src/chain_spec.rs @@ -92,7 +92,7 @@ pub struct ChainSpec { domain_transfer: u32, pub boot_nodes: Vec, - pub chain_id: u8, + pub network_id: u8, } impl ChainSpec { @@ -190,7 +190,7 @@ impl ChainSpec { * Network specific */ boot_nodes: vec![], - chain_id: 1, // mainnet chain id + network_id: 1, // mainnet network id } } @@ -208,7 +208,23 @@ impl ChainSpec { shuffle_round_count: 10, min_genesis_active_validator_count: 64, max_epochs_per_crosslink: 4, - chain_id: 2, // lighthouse testnet chain id + network_id: 2, // lighthouse testnet network id + boot_nodes, + ..ChainSpec::mainnet() + } + } + + /// Interop testing spec + /// + /// This allows us to customize a chain spec for interop testing. + pub fn interop() -> Self { + let boot_nodes = vec![]; + + Self { + seconds_per_slot: 12, + target_committee_size: 4, + shuffle_round_count: 10, + network_id: 13, boot_nodes, ..ChainSpec::mainnet() } diff --git a/eth2/utils/eth2_config/src/lib.rs b/eth2/utils/eth2_config/src/lib.rs index 17cbc4211..794a27e4e 100644 --- a/eth2/utils/eth2_config/src/lib.rs +++ b/eth2/utils/eth2_config/src/lib.rs @@ -37,6 +37,13 @@ impl Eth2Config { spec: ChainSpec::minimal(), } } + + pub fn interop() -> Self { + Self { + spec_constants: "interop".to_string(), + spec: ChainSpec::interop(), + } + } } impl Eth2Config { diff --git a/protos/src/services.proto b/protos/src/services.proto index bf23ff391..ba0462bbe 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -45,7 +45,7 @@ service AttestationService { message NodeInfoResponse { string version = 1; Fork fork = 2; - uint32 chain_id = 3; + uint32 network_id = 3; uint64 genesis_time = 4; uint64 genesis_slot = 5; } diff --git a/validator_client/eth2_config.toml b/validator_client/eth2_config.toml deleted file mode 100644 index 1e0781378..000000000 --- a/validator_client/eth2_config.toml +++ /dev/null @@ -1,47 +0,0 @@ -spec_constants = "minimal" - -[spec] -target_committee_size = 4 -max_indices_per_attestation = 4096 -min_per_epoch_churn_limit = 4 -churn_limit_quotient = 65536 -base_rewards_per_epoch = 5 -shuffle_round_count = 10 -deposit_contract_tree_depth = 32 -min_deposit_amount = 1000000000 -max_effective_balance = 32000000000 -ejection_balance = 16000000000 -effective_balance_increment = 1000000000 -genesis_slot = 0 -zero_hash = "0x0000000000000000000000000000000000000000000000000000000000000000" -bls_withdrawal_prefix_byte = "0x00" -genesis_time = 4294967295 -seconds_per_slot = 6 -min_attestation_inclusion_delay = 2 -min_seed_lookahead = 1 -activation_exit_delay = 4 -slots_per_eth1_voting_period = 16 -slots_per_historical_root = 8192 -min_validator_withdrawability_delay = 256 -persistent_committee_period = 2048 -max_crosslink_epochs = 64 -min_epochs_to_inactivity_penalty = 4 -base_reward_quotient = 32 -whistleblowing_reward_quotient = 512 -proposer_reward_quotient = 8 -inactivity_penalty_quotient = 33554432 -min_slashing_penalty_quotient = 32 -max_proposer_slashings = 16 -max_attester_slashings = 1 -max_attestations = 128 -max_deposits = 16 -max_voluntary_exits = 16 -max_transfers = 0 -domain_beacon_proposer = 0 -domain_randao = 1 -domain_attestation = 2 -domain_deposit = 3 -domain_voluntary_exit = 4 -domain_transfer = 5 -boot_nodes = ["/ip4/127.0.0.1/tcp/9000"] -chain_id = 2 diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index bd3919b5a..83a874df7 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -11,10 +11,10 @@ use crate::service::Service as ValidatorService; use clap::{App, Arg}; use eth2_config::{read_from_file, write_to_file, Eth2Config}; use protos::services_grpc::ValidatorServiceClient; -use slog::{crit, error, info, o, Drain, Level}; +use slog::{crit, error, info, o, warn, Drain, Level}; use std::fs; use std::path::PathBuf; -use types::{Keypair, MainnetEthSpec, MinimalEthSpec}; +use types::{InteropEthSpec, Keypair, MainnetEthSpec, MinimalEthSpec}; pub const DEFAULT_SPEC: &str = "minimal"; pub const DEFAULT_DATA_DIR: &str = ".lighthouse-validator"; @@ -64,14 +64,13 @@ fn main() { .takes_value(true), ) .arg( - Arg::with_name("spec-constants") - .long("spec-constants") + Arg::with_name("default-spec") + .long("default-spec") .value_name("TITLE") - .short("s") - .help("The title of the spec constants for chain config.") + .short("default-spec") + .help("Specifies the default eth2 spec to be used. This will override any spec written to disk and will therefore be used by default in future instances.") .takes_value(true) - .possible_values(&["mainnet", "minimal"]) - .default_value("minimal"), + .possible_values(&["mainnet", "minimal", "interop"]) ) .arg( Arg::with_name("debug-level") @@ -126,7 +125,7 @@ fn main() { let client_config_path = data_dir.join(CLIENT_CONFIG_FILENAME); - // Attempt to lead the `ClientConfig` from disk. + // Attempt to load the `ClientConfig` from disk. // // If file doesn't exist, create a new, default one. let mut client_config = match read_from_file::( @@ -164,29 +163,58 @@ fn main() { .and_then(|s| Some(PathBuf::from(s))) .unwrap_or_else(|| data_dir.join(ETH2_CONFIG_FILENAME)); - // Attempt to load the `Eth2Config` from file. + // Initialise the `Eth2Config`. // - // If the file doesn't exist, create a default one depending on the CLI flags. - let mut eth2_config = match read_from_file::(eth2_config_path.clone()) { - Ok(Some(c)) => c, - Ok(None) => { - let default = match matches.value_of("spec-constants") { - Some("mainnet") => Eth2Config::mainnet(), - Some("minimal") => Eth2Config::minimal(), - _ => unreachable!(), // Guarded by slog. - }; - if let Err(e) = write_to_file(eth2_config_path, &default) { - crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); - return; - } - default - } + // If a CLI parameter is set, overwrite any config file present. + // If a parameter is not set, use either the config file present or default to minimal. + let cli_config = match matches.value_of("default-spec") { + Some("mainnet") => Some(Eth2Config::mainnet()), + Some("minimal") => Some(Eth2Config::minimal()), + Some("interop") => Some(Eth2Config::interop()), + _ => None, + }; + // if a CLI flag is specified, write the new config if it doesn't exist, + // otherwise notify the user that the file will not be written. + let eth2_config_from_file = match read_from_file::(eth2_config_path.clone()) { + Ok(config) => config, Err(e) => { - crit!(log, "Failed to instantiate an Eth2Config"; "error" => format!("{:?}", e)); + crit!(log, "Failed to read the Eth2Config from file"; "error" => format!("{:?}", e)); return; } }; + let mut eth2_config = { + if let Some(cli_config) = cli_config { + if eth2_config_from_file.is_none() { + // write to file if one doesn't exist + if let Err(e) = write_to_file(eth2_config_path, &cli_config) { + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + return; + } + } else { + warn!( + log, + "Eth2Config file exists. Configuration file is ignored, using default" + ); + } + cli_config + } else { + // CLI config not specified, read from disk + match eth2_config_from_file { + Some(config) => config, + None => { + // set default to minimal + let eth2_config = Eth2Config::minimal(); + if let Err(e) = write_to_file(eth2_config_path, ð2_config) { + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + return; + } + eth2_config + } + } + } + }; + // Update the eth2 config with any CLI flags. match eth2_config.apply_cli_args(&matches) { Ok(()) => (), @@ -214,6 +242,11 @@ fn main() { eth2_config, log.clone(), ), + "interop" => ValidatorService::::start( + client_config, + eth2_config, + log.clone(), + ), other => { crit!(log, "Unknown spec constants"; "title" => other); return; diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 3f99efe36..3ddb96e4c 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -23,7 +23,7 @@ use protos::services_grpc::{ AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, ValidatorServiceClient, }; -use slog::{error, info, warn}; +use slog::{crit, error, info, warn}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::marker::PhantomData; use std::sync::Arc; @@ -37,7 +37,7 @@ use types::{ChainSpec, Epoch, EthSpec, Fork, Slot}; /// A fixed amount of time after a slot to perform operations. This gives the node time to complete /// per-slot processes. -const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(200); +const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); /// The validator service. This is the main thread that executes and maintains validator /// duties. @@ -106,13 +106,13 @@ impl Service Service node_info.version.clone(), "Chain ID" => node_info.chain_id, "Genesis time" => genesis_time); + info!(log,"Beacon node connected"; "Node Version" => node_info.version.clone(), "Chain ID" => node_info.network_id, "Genesis time" => genesis_time); let proto_fork = node_info.get_fork(); let mut previous_version: [u8; 4] = [0; 4]; @@ -303,12 +303,16 @@ impl Service self.current_slot, - "The Timer should poll a new slot" - ); + // this is a non-fatal error. If the slot clock repeats, the node could + // have been slow to process the previous slot and is now duplicating tasks. + // We ignore duplicated but raise a critical error. + if current_slot <= self.current_slot { + crit!( + self.log, + "The validator tried to duplicate a slot. Likely missed the previous slot" + ); + return Err("Duplicate slot".into()); + } self.current_slot = current_slot; info!(self.log, "Processing"; "slot" => current_slot.as_u64(), "epoch" => current_epoch.as_u64()); Ok(())