From 468015f9bbc1d74522f041cc278740026d4e1893 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sat, 10 Aug 2019 11:44:17 +1000 Subject: [PATCH 1/2] Initial Interop Updates (#492) * Add interop chain spec and rename chain_id * Add ability to connect to raw libp2p nodes * Adds Identify protocol, clean up RPC protocol name handling * Update to latest libp2p, gossipsub improvements * Updates to latest interop branch. - Shifts decoding of objects into message handler. - Updates to latest interop gossipsub. - Adds interop spec constant. * Configuration updates allow for verbosity CLI flag and spec constants * Update submodules to master * Correct minimal chainspec modifications * Duplication of validator polls are no longer fatal * Apply PR suggestions --- beacon_node/client/src/config.rs | 12 +- beacon_node/eth2-libp2p/Cargo.toml | 4 +- beacon_node/eth2-libp2p/src/behaviour.rs | 213 +++++++++--------- beacon_node/eth2-libp2p/src/config.rs | 26 ++- beacon_node/eth2-libp2p/src/discovery.rs | 12 +- beacon_node/eth2-libp2p/src/lib.rs | 4 +- beacon_node/eth2-libp2p/src/rpc/handler.rs | 27 ++- beacon_node/eth2-libp2p/src/rpc/mod.rs | 18 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 92 +++----- beacon_node/eth2-libp2p/src/service.rs | 52 +++-- beacon_node/http_server/src/api.rs | 2 +- beacon_node/http_server/src/lib.rs | 2 +- beacon_node/network/src/message_handler.rs | 101 ++++++--- beacon_node/network/src/service.rs | 33 ++- beacon_node/network/src/sync/simple_sync.rs | 40 ++-- beacon_node/rpc/src/attestation.rs | 12 +- beacon_node/rpc/src/beacon_block.rs | 14 +- beacon_node/rpc/src/beacon_node.rs | 2 +- beacon_node/rpc/src/lib.rs | 2 +- beacon_node/src/main.rs | 115 +++++++--- beacon_node/src/run.rs | 18 +- .../src/beacon_state/beacon_state_types.rs | 34 +++ eth2/types/src/chain_spec.rs | 22 +- eth2/utils/eth2_config/src/lib.rs | 7 + protos/src/services.proto | 2 +- validator_client/eth2_config.toml | 47 ---- validator_client/src/main.rs | 85 ++++--- validator_client/src/service.rs | 28 ++- 28 files changed, 590 insertions(+), 436 deletions(-) delete mode 100644 validator_client/eth2_config.toml diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 1a27de406..176625d77 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -1,3 +1,4 @@ +use crate::Eth2Config; use clap::ArgMatches; use http_server::HttpServerConfig; use network::NetworkConfig; @@ -56,8 +57,6 @@ impl Default for Config { log_file: PathBuf::from(""), db_type: "disk".to_string(), db_name: "chain_db".to_string(), - // Note: there are no default bootnodes specified. - // Once bootnodes are established, add them here. network: NetworkConfig::new(), rpc: rpc::RPCConfig::default(), http: HttpServerConfig::default(), @@ -129,6 +128,15 @@ impl Config { self.data_dir = PathBuf::from(dir); }; + if let Some(default_spec) = args.value_of("default-spec") { + match default_spec { + "mainnet" => self.spec_constants = Eth2Config::mainnet().spec_constants, + "minimal" => self.spec_constants = Eth2Config::minimal().spec_constants, + "interop" => self.spec_constants = Eth2Config::interop().spec_constants, + _ => {} // not supported + } + } + if let Some(dir) = args.value_of("db") { self.db_type = dir.to_string(); }; diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 016973480..794b09712 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,8 +7,8 @@ edition = "2018" [dependencies] clap = "2.32.0" #SigP repository -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b" } -enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 37e3419a3..b87f8a061 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -2,47 +2,52 @@ use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, RPC}; use crate::{error, NetworkConfig}; use crate::{Topic, TopicHash}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC}; use futures::prelude::*; use libp2p::{ - core::{ - identity::Keypair, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - }, + core::identity::Keypair, discv5::Discv5Event, gossipsub::{Gossipsub, GossipsubEvent}, + identify::{Identify, IdentifyEvent}, ping::{Ping, PingConfig, PingEvent}, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{o, trace, warn}; -use ssz::{ssz_encode, Decode, DecodeError, Encode}; +use slog::{debug, o, trace}; +use ssz::{ssz_encode, Encode}; use std::num::NonZeroU32; use std::time::Duration; -use types::{Attestation, BeaconBlock, EthSpec}; + +const MAX_IDENTIFY_ADDRESSES: usize = 20; /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] -pub struct Behaviour { +#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] +pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, - /// The serenity RPC specified in the wire-0 protocol. - serenity_rpc: RPC, + /// The Eth2 RPC specified in the wire-0 protocol. + eth2_rpc: RPC, /// Keep regular connection to peers and disconnect if absent. + // TODO: Remove Libp2p ping in favour of discv5 ping. ping: Ping, - /// Kademlia for peer discovery. + // TODO: Using id for initial interop. This will be removed by mainnet. + /// Provides IP addresses and peer information. + identify: Identify, + /// Discovery behaviour. discovery: Discovery, #[behaviour(ignore)] /// The events generated by this behaviour to be consumed in the swarm poll. - events: Vec>, + events: Vec, /// Logger for behaviour actions. #[behaviour(ignore)] log: slog::Logger, } -impl Behaviour { +impl Behaviour { pub fn new( local_key: &Keypair, net_conf: &NetworkConfig, @@ -50,17 +55,25 @@ impl Behaviour { ) -> error::Result { let local_peer_id = local_key.public().clone().into_peer_id(); let behaviour_log = log.new(o!()); + let ping_config = PingConfig::new() .with_timeout(Duration::from_secs(30)) .with_interval(Duration::from_secs(20)) .with_max_failures(NonZeroU32::new(2).expect("2 != 0")) .with_keep_alive(false); + let identify = Identify::new( + "lighthouse/libp2p".into(), + version::version(), + local_key.public(), + ); + Ok(Behaviour { - serenity_rpc: RPC::new(log), + eth2_rpc: RPC::new(log), gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), discovery: Discovery::new(local_key, net_conf, log)?, ping: Ping::new(ping_config), + identify, events: Vec::new(), log: behaviour_log, }) @@ -68,31 +81,20 @@ impl Behaviour { } // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour -impl NetworkBehaviourEventProcess - for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, event: GossipsubEvent) { match event { GossipsubEvent::Message(gs_msg) => { trace!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg)); - let pubsub_message = match PubsubMessage::from_ssz_bytes(&gs_msg.data) { - //TODO: Punish peer on error - Err(e) => { - warn!( - self.log, - "Received undecodable message from Peer {:?} error", gs_msg.source; - "error" => format!("{:?}", e) - ); - return; - } - Ok(msg) => msg, - }; + let msg = PubsubMessage::from_topics(&gs_msg.topics, gs_msg.data); self.events.push(BehaviourEvent::GossipMessage { source: gs_msg.source, topics: gs_msg.topics, - message: Box::new(pubsub_message), + message: msg, }); } GossipsubEvent::Subscribed { .. } => {} @@ -101,8 +103,8 @@ impl NetworkBehaviourEventProces } } -impl NetworkBehaviourEventProcess - for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, event: RPCMessage) { match event { @@ -119,19 +121,19 @@ impl NetworkBehaviourEventProces } } -impl NetworkBehaviourEventProcess - for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, _event: PingEvent) { // not interested in ping responses at the moment. } } -impl Behaviour { +impl Behaviour { /// Consumes the events list when polled. fn poll( &mut self, - ) -> Async>> { + ) -> Async> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); } @@ -140,8 +142,36 @@ impl Behaviour { } } -impl NetworkBehaviourEventProcess - for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, event: IdentifyEvent) { + match event { + IdentifyEvent::Identified { + peer_id, mut info, .. + } => { + if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES { + debug!( + self.log, + "More than 20 addresses have been identified, truncating" + ); + info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES); + } + debug!(self.log, "Identified Peer"; "Peer" => format!("{}", peer_id), + "Protocol Version" => info.protocol_version, + "Agent Version" => info.agent_version, + "Listening Addresses" => format!("{:?}", info.listen_addrs), + "Protocols" => format!("{:?}", info.protocols) + ); + } + IdentifyEvent::Error { .. } => {} + IdentifyEvent::SendBack { .. } => {} + } + } +} + +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, _event: Discv5Event) { // discv5 has no events to inject @@ -149,7 +179,7 @@ impl NetworkBehaviourEventProces } /// Implements the combined behaviour for the libp2p service. -impl Behaviour { +impl Behaviour { /* Pubsub behaviour functions */ /// Subscribes to a gossipsub topic. @@ -158,7 +188,7 @@ impl Behaviour { } /// Publishes a message on the pubsub (gossipsub) behaviour. - pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { + pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { let message_bytes = ssz_encode(&message); for topic in topics { self.gossipsub.publish(topic, message_bytes.clone()); @@ -169,7 +199,7 @@ impl Behaviour { /// Sends an RPC Request/Response via the RPC protocol. pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.serenity_rpc.send_rpc(peer_id, rpc_event); + self.eth2_rpc.send_rpc(peer_id, rpc_event); } /* Discovery / Peer management functions */ @@ -179,99 +209,60 @@ impl Behaviour { } /// The types of events than can be obtained from polling the behaviour. -pub enum BehaviourEvent { +pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), PeerDisconnected(PeerId), GossipMessage { source: PeerId, topics: Vec, - message: Box>, + message: PubsubMessage, }, } /// Messages that are passed to and from the pubsub (Gossipsub) behaviour. #[derive(Debug, Clone, PartialEq)] -pub enum PubsubMessage { +pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. - Block(BeaconBlock), + Block(Vec), /// Gossipsub message providing notification of a new attestation. - Attestation(Attestation), + Attestation(Vec), + /// Gossipsub message from an unknown topic. + Unknown(Vec), } -//TODO: Correctly encode/decode enums. Prefixing with integer for now. -impl Encode for PubsubMessage { +impl PubsubMessage { + /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will + * need to be modified. + * + * Also note that a message can be associated with many topics. As soon as one of the topics is + * known we match. If none of the topics are known we return an unknown state. + */ + fn from_topics(topics: &Vec, data: Vec) -> Self { + for topic in topics { + match topic.as_str() { + BEACON_BLOCK_TOPIC => return PubsubMessage::Block(data), + BEACON_ATTESTATION_TOPIC => return PubsubMessage::Attestation(data), + _ => {} + } + } + PubsubMessage::Unknown(data) + } +} + +impl Encode for PubsubMessage { fn is_ssz_fixed_len() -> bool { false } fn ssz_append(&self, buf: &mut Vec) { - let offset = ::ssz_fixed_len() + as Encode>::ssz_fixed_len(); - - let mut encoder = ssz::SszEncoder::container(buf, offset); - match self { - PubsubMessage::Block(block_gossip) => { - encoder.append(&0_u32); - + PubsubMessage::Block(inner) + | PubsubMessage::Attestation(inner) + | PubsubMessage::Unknown(inner) => { // Encode the gossip as a Vec; - encoder.append(&block_gossip.as_ssz_bytes()); - } - PubsubMessage::Attestation(attestation_gossip) => { - encoder.append(&1_u32); - - // Encode the gossip as a Vec; - encoder.append(&attestation_gossip.as_ssz_bytes()); + buf.append(&mut inner.as_ssz_bytes()); } } - - encoder.finalize(); - } -} - -impl Decode for PubsubMessage { - fn is_ssz_fixed_len() -> bool { - false - } - - fn from_ssz_bytes(bytes: &[u8]) -> Result { - let mut builder = ssz::SszDecoderBuilder::new(&bytes); - - builder.register_type::()?; - builder.register_type::>()?; - - let mut decoder = builder.build()?; - - let id: u32 = decoder.decode_next()?; - let body: Vec = decoder.decode_next()?; - - match id { - 0 => Ok(PubsubMessage::Block(BeaconBlock::from_ssz_bytes(&body)?)), - 1 => Ok(PubsubMessage::Attestation(Attestation::from_ssz_bytes( - &body, - )?)), - _ => Err(DecodeError::BytesInvalid( - "Invalid PubsubMessage id".to_string(), - )), - } - } -} - -#[cfg(test)] -mod test { - use super::*; - use types::*; - - #[test] - fn ssz_encoding() { - let original = PubsubMessage::Block(BeaconBlock::::empty( - &MainnetEthSpec::default_spec(), - )); - - let encoded = ssz_encode(&original); - - let decoded = PubsubMessage::from_ssz_bytes(&encoded).unwrap(); - - assert_eq!(original, decoded); } } diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 7391dba8a..ddf14cc04 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -1,12 +1,13 @@ use clap::ArgMatches; use enr::Enr; use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; +use libp2p::Multiaddr; use serde_derive::{Deserialize, Serialize}; use std::path::PathBuf; use std::time::Duration; /// The beacon node topic string to subscribe to. -pub const BEACON_PUBSUB_TOPIC: &str = "beacon_block"; +pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_ATTESTATION_TOPIC: &str = "beacon_attestation"; pub const SHARD_TOPIC_PREFIX: &str = "shard"; @@ -39,6 +40,9 @@ pub struct Config { /// List of nodes to initially connect to. pub boot_nodes: Vec, + /// List of libp2p nodes to initially connect to. + pub libp2p_nodes: Vec, + /// Client version pub client_version: String, @@ -60,12 +64,13 @@ impl Default for Config { discovery_port: 9000, max_peers: 10, //TODO: Set realistic values for production + // Note: This defaults topics to plain strings. Not hashes gs_config: GossipsubConfigBuilder::new() - .max_gossip_size(4_000_000) - .inactivity_timeout(Duration::from_secs(90)) + .max_transmit_size(1_000_000) .heartbeat_interval(Duration::from_secs(20)) .build(), boot_nodes: vec![], + libp2p_nodes: vec![], client_version: version::version(), topics: Vec::new(), } @@ -118,6 +123,21 @@ impl Config { .collect::, _>>()?; } + if let Some(libp2p_addresses_str) = args.value_of("libp2p-addresses") { + self.libp2p_nodes = libp2p_addresses_str + .split(',') + .map(|multiaddr| { + multiaddr + .parse() + .map_err(|_| format!("Invalid Multiaddr: {}", multiaddr)) + }) + .collect::, _>>()?; + } + + if let Some(topics_str) = args.value_of("topics") { + self.topics = topics_str.split(',').map(|s| s.into()).collect(); + } + if let Some(discovery_address_str) = args.value_of("discovery-address") { self.discovery_address = discovery_address_str .parse() diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index c2f008756..4c1794945 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -4,13 +4,11 @@ use crate::{error, NetworkConfig}; /// Currently using discv5 for peer discovery. /// use futures::prelude::*; -use libp2p::core::swarm::{ - ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, -}; -use libp2p::core::{identity::Keypair, Multiaddr, PeerId, ProtocolsHandler}; +use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId}; use libp2p::discv5::{Discv5, Discv5Event}; use libp2p::enr::{Enr, EnrBuilder, NodeId}; use libp2p::multiaddr::Protocol; +use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use slog::{debug, info, o, warn}; use std::collections::HashSet; use std::fs::File; @@ -37,6 +35,9 @@ pub struct Discovery { /// The target number of connected peers on the libp2p interface. max_peers: usize, + /// directory to save ENR to + enr_dir: String, + /// The delay between peer discovery searches. peer_discovery_delay: Delay, @@ -54,9 +55,6 @@ pub struct Discovery { /// Logger for the discovery behaviour. log: slog::Logger, - - /// directory to save ENR to - enr_dir: String, } impl Discovery { diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 7a3b2e632..54a4f2a99 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -11,9 +11,9 @@ mod service; pub use behaviour::PubsubMessage; pub use config::{ - Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX, + Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, SHARD_TOPIC_PREFIX, }; -pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; +pub use libp2p::gossipsub::{Topic, TopicHash}; pub use libp2p::multiaddr; pub use libp2p::Multiaddr; pub use libp2p::{ diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 4e796f6fb..dbc32c5a4 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -5,23 +5,21 @@ use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use core::marker::PhantomData; use fnv::FnvHashMap; use futures::prelude::*; -use libp2p::core::protocols_handler::{ +use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; +use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; -use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; use smallvec::SmallVec; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; -use types::EthSpec; /// The time (in seconds) before a substream that is awaiting a response times out. pub const RESPONSE_TIMEOUT: u64 = 9; /// Implementation of `ProtocolsHandler` for the RPC protocol. -pub struct RPCHandler +pub struct RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, @@ -56,8 +54,8 @@ where /// After the given duration has elapsed, an inactive connection will shutdown. inactive_timeout: Duration, - /// Phantom EthSpec. - _phantom: PhantomData, + /// Marker to pin the generic stream. + _phantom: PhantomData, } /// An outbound substream is waiting a response from the user. @@ -90,10 +88,9 @@ where }, } -impl RPCHandler +impl RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { pub fn new( listen_protocol: SubstreamProtocol, @@ -145,20 +142,18 @@ where } } -impl Default for RPCHandler +impl Default for RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { fn default() -> Self { RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30)) } } -impl ProtocolsHandler for RPCHandler +impl ProtocolsHandler for RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { type InEvent = RPCEvent; type OutEvent = RPCEvent; @@ -273,7 +268,11 @@ where Self::Error, > { if let Some(err) = self.pending_error.take() { - return Err(err); + // Returning an error here will result in dropping any peer that doesn't support any of + // the RPC protocols. For our immediate purposes we permit this and simply log that an + // upgrade was not supported. + // TODO: Add a logger to the handler for trace output. + dbg!(&err); } // return any events that need to be reported diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 88060e602..756a62e71 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -6,9 +6,9 @@ use futures::prelude::*; use handler::RPCHandler; -use libp2p::core::protocols_handler::ProtocolsHandler; -use libp2p::core::swarm::{ - ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +use libp2p::core::ConnectedPoint; +use libp2p::swarm::{ + protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::{Multiaddr, PeerId}; pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId}; @@ -16,7 +16,6 @@ pub use protocol::{RPCError, RPCProtocol, RPCRequest}; use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; -use types::EthSpec; pub(crate) mod codec; mod handler; @@ -50,16 +49,16 @@ impl RPCEvent { /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. -pub struct RPC { +pub struct RPC { /// Queue of events to processed. events: Vec>, /// Pins the generic substream. - marker: PhantomData<(TSubstream, E)>, + marker: PhantomData<(TSubstream)>, /// Slog logger for RPC behaviour. _log: slog::Logger, } -impl RPC { +impl RPC { pub fn new(log: &slog::Logger) -> Self { let log = log.new(o!("Service" => "Libp2p-RPC")); RPC { @@ -80,12 +79,11 @@ impl RPC { } } -impl NetworkBehaviour for RPC +impl NetworkBehaviour for RPC where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { - type ProtocolsHandler = RPCHandler; + type ProtocolsHandler = RPCHandler; type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 8729de3a7..b606fc743 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -8,7 +8,7 @@ use futures::{ future::{self, FutureResult}, sink, stream, Sink, Stream, }; -use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use std::io; use std::time::Duration; use tokio::codec::Framed; @@ -28,24 +28,22 @@ const REQUEST_TIMEOUT: u64 = 3; pub struct RPCProtocol; impl UpgradeInfo for RPCProtocol { - type Info = RawProtocolId; + type Info = ProtocolId; type InfoIter = Vec; fn protocol_info(&self) -> Self::InfoIter { vec![ - ProtocolId::new("hello", "1.0.0", "ssz").into(), - ProtocolId::new("goodbye", "1.0.0", "ssz").into(), - ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into(), - ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into(), - ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into(), + ProtocolId::new("hello", "1.0.0", "ssz"), + ProtocolId::new("goodbye", "1.0.0", "ssz"), + ProtocolId::new("beacon_block_roots", "1.0.0", "ssz"), + ProtocolId::new("beacon_block_headers", "1.0.0", "ssz"), + ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz"), ] } } -/// The raw protocol id sent over the wire. -type RawProtocolId = Vec; - /// Tracks the types in a protocol id. +#[derive(Clone)] pub struct ProtocolId { /// The rpc message type/name. pub message_name: String, @@ -55,44 +53,31 @@ pub struct ProtocolId { /// The encoding of the RPC. pub encoding: String, + + /// The protocol id that is formed from the above fields. + protocol_id: String, } /// An RPC protocol ID. impl ProtocolId { pub fn new(message_name: &str, version: &str, encoding: &str) -> Self { + let protocol_id = format!( + "{}/{}/{}/{}", + PROTOCOL_PREFIX, message_name, version, encoding + ); + ProtocolId { message_name: message_name.into(), version: version.into(), encoding: encoding.into(), + protocol_id, } } - - /// Converts a raw RPC protocol id string into an `RPCProtocolId` - pub fn from_bytes(bytes: &[u8]) -> Result { - let protocol_string = String::from_utf8(bytes.to_vec()) - .map_err(|_| RPCError::InvalidProtocol("Invalid protocol Id"))?; - let protocol_list: Vec<&str> = protocol_string.as_str().split('/').take(7).collect(); - - if protocol_list.len() != 7 { - return Err(RPCError::InvalidProtocol("Not enough '/'")); - } - - Ok(ProtocolId { - message_name: protocol_list[4].into(), - version: protocol_list[5].into(), - encoding: protocol_list[6].into(), - }) - } } -impl Into for ProtocolId { - fn into(self) -> RawProtocolId { - format!( - "{}/{}/{}/{}", - PROTOCOL_PREFIX, self.message_name, self.version, self.encoding - ) - .as_bytes() - .to_vec() +impl ProtocolName for ProtocolId { + fn protocol_name(&self) -> &[u8] { + self.protocol_id.as_bytes() } } @@ -127,16 +112,11 @@ where fn upgrade_inbound( self, socket: upgrade::Negotiated, - protocol: RawProtocolId, + protocol: ProtocolId, ) -> Self::Future { - // TODO: Verify this - let protocol_id = - ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols"); - - match protocol_id.encoding.as_str() { + match protocol.encoding.as_str() { "ssz" | _ => { - let ssz_codec = - BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, MAX_RPC_SIZE)); + let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); let codec = InboundCodec::SSZ(ssz_codec); Framed::new(socket, codec) .into_future() @@ -171,7 +151,7 @@ pub enum RPCRequest { } impl UpgradeInfo for RPCRequest { - type Info = RawProtocolId; + type Info = ProtocolId; type InfoIter = Vec; // add further protocols as we support more encodings/versions @@ -182,22 +162,25 @@ impl UpgradeInfo for RPCRequest { /// Implements the encoding per supported protocol for RPCRequest. impl RPCRequest { - pub fn supported_protocols(&self) -> Vec { + pub fn supported_protocols(&self) -> Vec { match self { // add more protocols when versions/encodings are supported - RPCRequest::Hello(_) => vec![ProtocolId::new("hello", "1.0.0", "ssz").into()], - RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz").into()], + RPCRequest::Hello(_) => vec![ + ProtocolId::new("hello", "1.0.0", "ssz"), + ProtocolId::new("goodbye", "1.0.0", "ssz"), + ], + RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz")], RPCRequest::BeaconBlockRoots(_) => { - vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into()] + vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz")] } RPCRequest::BeaconBlockHeaders(_) => { - vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into()] + vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz")] } RPCRequest::BeaconBlockBodies(_) => { - vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into()] + vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz")] } RPCRequest::BeaconChainState(_) => { - vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz").into()] + vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz")] } } } @@ -230,12 +213,9 @@ where socket: upgrade::Negotiated, protocol: Self::Info, ) -> Self::Future { - let protocol_id = - ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols"); - - match protocol_id.encoding.as_str() { + match protocol.encoding.as_str() { "ssz" | _ => { - let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol_id, 4096)); + let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, 4096)); let codec = OutboundCodec::SSZ(ssz_codec); Framed::new(socket, codec).send(self) } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 05ae9e473..316aa0579 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -3,8 +3,8 @@ use crate::error; use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; use crate::NetworkConfig; -use crate::{TopicBuilder, TopicHash}; -use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC}; +use crate::{Topic, TopicHash}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ @@ -21,25 +21,24 @@ use std::fs::File; use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::time::Duration; -use types::EthSpec; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; -type Libp2pBehaviour = Behaviour, E>; +type Libp2pBehaviour = Behaviour>; const NETWORK_KEY_FILENAME: &str = "key"; /// The configuration and state of the libp2p components for the beacon node. -pub struct Service { +pub struct Service { /// The libp2p Swarm handler. //TODO: Make this private - pub swarm: Swarm>, + pub swarm: Swarm, /// This node's PeerId. _local_peer_id: PeerId, /// The libp2p logger handle. pub log: slog::Logger, } -impl Service { +impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { debug!(log, "Network-libp2p Service starting"); @@ -76,18 +75,35 @@ impl Service { ), }; + // attempt to connect to user-input libp2p nodes + for multiaddr in config.libp2p_nodes { + match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { + Ok(()) => debug!(log, "Dialing libp2p node: {}", multiaddr), + Err(err) => debug!( + log, + "Could not connect to node: {} error: {:?}", multiaddr, err + ), + }; + } + // subscribe to default gossipsub topics let mut topics = vec![]; //TODO: Handle multiple shard attestations. For now we simply use a separate topic for - //attestations - topics.push(BEACON_ATTESTATION_TOPIC.to_string()); - topics.push(BEACON_PUBSUB_TOPIC.to_string()); - topics.append(&mut config.topics.clone()); + // attestations + topics.push(Topic::new(BEACON_ATTESTATION_TOPIC.into())); + topics.push(Topic::new(BEACON_BLOCK_TOPIC.into())); + topics.append( + &mut config + .topics + .iter() + .cloned() + .map(|s| Topic::new(s)) + .collect(), + ); let mut subscribed_topics = vec![]; for topic in topics { - let t = TopicBuilder::new(topic.clone()).build(); - if swarm.subscribe(t) { + if swarm.subscribe(topic.clone()) { trace!(log, "Subscribed to topic: {:?}", topic); subscribed_topics.push(topic); } else { @@ -104,8 +120,8 @@ impl Service { } } -impl Stream for Service { - type Item = Libp2pEvent; +impl Stream for Service { + type Item = Libp2pEvent; type Error = crate::error::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -119,7 +135,7 @@ impl Stream for Service { topics, message, } => { - trace!(self.log, "Pubsub message received: {:?}", message); + trace!(self.log, "Gossipsub message received"; "Message" => format!("{:?}", message)); return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { source, topics, @@ -179,7 +195,7 @@ fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox) } /// Events that can be obtained from polling the Libp2p Service. -pub enum Libp2pEvent { +pub enum Libp2pEvent { /// An RPC response request has been received on the swarm. RPC(PeerId, RPCEvent), /// Initiated the connection to a new peer. @@ -190,7 +206,7 @@ pub enum Libp2pEvent { PubsubMessage { source: PeerId, topics: Vec, - message: Box>, + message: PubsubMessage, }, } diff --git a/beacon_node/http_server/src/api.rs b/beacon_node/http_server/src/api.rs index a91080899..8cb023b02 100644 --- a/beacon_node/http_server/src/api.rs +++ b/beacon_node/http_server/src/api.rs @@ -64,7 +64,7 @@ fn handle_fork(req: &mut Request) -> IronResult( pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, - _network_chan: mpsc::UnboundedSender>, + _network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, db_path: PathBuf, metrics_registry: Registry, diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index eaddce533..72a507ad7 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -14,7 +14,7 @@ use slog::{debug, warn}; use ssz::{Decode, DecodeError}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{BeaconBlockHeader, EthSpec}; +use types::{Attestation, BeaconBlock, BeaconBlockHeader}; /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -23,14 +23,14 @@ pub struct MessageHandler { /// The syncing framework. sync: SimpleSync, /// The context required to send messages to, and process messages from peers. - network_context: NetworkContext, + network_context: NetworkContext, /// The `MessageHandler` logger. log: slog::Logger, } /// Types of messages the handler can receive. #[derive(Debug)] -pub enum HandlerMessage { +pub enum HandlerMessage { /// We have initiated a connection to a new peer. PeerDialed(PeerId), /// Peer has disconnected, @@ -38,17 +38,17 @@ pub enum HandlerMessage { /// An RPC response/request has been received. RPC(PeerId, RPCEvent), /// A gossip message has been received. - PubsubMessage(PeerId, Box>), + PubsubMessage(PeerId, PubsubMessage), } impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn spawn( beacon_chain: Arc>, - network_send: mpsc::UnboundedSender>, + network_send: mpsc::UnboundedSender, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, - ) -> error::Result>> { + ) -> error::Result> { debug!(log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); @@ -78,7 +78,7 @@ impl MessageHandler { } /// Handle all messages incoming from the network service. - fn handle_message(&mut self, message: HandlerMessage) { + fn handle_message(&mut self, message: HandlerMessage) { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { @@ -94,7 +94,7 @@ impl MessageHandler { } // we have received an RPC message request/response HandlerMessage::PubsubMessage(peer_id, gossip) => { - self.handle_gossip(peer_id, *gossip); + self.handle_gossip(peer_id, gossip); } } } @@ -218,6 +218,62 @@ impl MessageHandler { } } + /// Handle various RPC errors + fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { + //TODO: Handle error correctly + warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); + } + + /// Handle RPC messages + fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { + match gossip_message { + PubsubMessage::Block(message) => match self.decode_gossip_block(message) { + Err(e) => { + debug!(self.log, "Invalid Gossiped Beacon Block"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + Ok(block) => { + let _should_forward_on = + self.sync + .on_block_gossip(peer_id, block, &mut self.network_context); + } + }, + PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) { + Err(e) => { + debug!(self.log, "Invalid Gossiped Attestation"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + Ok(attestation) => { + self.sync + .on_attestation_gossip(peer_id, attestation, &mut self.network_context) + } + }, + PubsubMessage::Unknown(message) => { + // Received a message from an unknown topic. Ignore for now + debug!(self.log, "Unknown Gossip Message"; "Peer" => format!("{}", peer_id), "Message" => format!("{:?}", message)); + } + } + } + + /* Decoding of blocks and attestations from the network. + * + * TODO: Apply efficient decoding/verification of these objects + */ + + fn decode_gossip_block( + &self, + beacon_block: Vec, + ) -> Result, DecodeError> { + //TODO: Apply verification before decoding. + BeaconBlock::from_ssz_bytes(&beacon_block) + } + + fn decode_gossip_attestation( + &self, + beacon_block: Vec, + ) -> Result, DecodeError> { + //TODO: Apply verification before decoding. + Attestation::from_ssz_bytes(&beacon_block) + } + /// Verifies and decodes the ssz-encoded block bodies received from peers. fn decode_block_bodies( &self, @@ -241,39 +297,18 @@ impl MessageHandler { //TODO: Implement faster header verification before decoding entirely Vec::from_ssz_bytes(&headers_response.headers) } - - /// Handle various RPC errors - fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { - //TODO: Handle error correctly - warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); - } - - /// Handle RPC messages - fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { - match gossip_message { - PubsubMessage::Block(message) => { - let _should_forward_on = - self.sync - .on_block_gossip(peer_id, message, &mut self.network_context); - } - PubsubMessage::Attestation(message) => { - self.sync - .on_attestation_gossip(peer_id, message, &mut self.network_context) - } - } - } } // TODO: RPC Rewrite makes this struct fairly pointless -pub struct NetworkContext { +pub struct NetworkContext { /// The network channel to relay messages to the Network service. - network_send: mpsc::UnboundedSender>, + network_send: mpsc::UnboundedSender, /// The `MessageHandler` logger. log: slog::Logger, } -impl NetworkContext { - pub fn new(network_send: mpsc::UnboundedSender>, log: slog::Logger) -> Self { +impl NetworkContext { + pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { Self { network_send, log } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7a21f7f28..e5ca2a917 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,13 +14,12 @@ use slog::{debug, info, o, trace}; use std::sync::Arc; use tokio::runtime::TaskExecutor; use tokio::sync::{mpsc, oneshot}; -use types::EthSpec; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { - libp2p_service: Arc>>, + libp2p_service: Arc>, _libp2p_exit: oneshot::Sender<()>, - _network_send: mpsc::UnboundedSender>, + _network_send: mpsc::UnboundedSender, _phantom: PhantomData, //message_handler: MessageHandler, //message_handler_send: Sender } @@ -31,9 +30,9 @@ impl Service { config: &NetworkConfig, executor: &TaskExecutor, log: slog::Logger, - ) -> error::Result<(Arc, mpsc::UnboundedSender>)> { + ) -> error::Result<(Arc, mpsc::UnboundedSender)> { // build the network channel - let (network_send, network_recv) = mpsc::unbounded_channel::>(); + let (network_send, network_recv) = mpsc::unbounded_channel::(); // launch message handler thread let message_handler_log = log.new(o!("Service" => "MessageHandler")); let message_handler_send = MessageHandler::spawn( @@ -65,15 +64,15 @@ impl Service { Ok((Arc::new(network_service), network_send)) } - pub fn libp2p_service(&self) -> Arc>> { + pub fn libp2p_service(&self) -> Arc> { self.libp2p_service.clone() } } -fn spawn_service( - libp2p_service: Arc>>, - network_recv: mpsc::UnboundedReceiver>, - message_handler_send: mpsc::UnboundedSender>, +fn spawn_service( + libp2p_service: Arc>, + network_recv: mpsc::UnboundedReceiver, + message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, log: slog::Logger, ) -> error::Result> { @@ -99,10 +98,10 @@ fn spawn_service( } //TODO: Potentially handle channel errors -fn network_service( - libp2p_service: Arc>>, - mut network_recv: mpsc::UnboundedReceiver>, - mut message_handler_send: mpsc::UnboundedSender>, +fn network_service( + libp2p_service: Arc>, + mut network_recv: mpsc::UnboundedReceiver, + mut message_handler_send: mpsc::UnboundedSender, log: slog::Logger, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { @@ -119,7 +118,7 @@ fn network_service( }, NetworkMessage::Publish { topics, message } => { debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); - libp2p_service.lock().swarm.publish(topics, *message); + libp2p_service.lock().swarm.publish(topics, message); } }, Ok(Async::NotReady) => break, @@ -176,14 +175,14 @@ fn network_service( /// Types of messages that the network service can receive. #[derive(Debug)] -pub enum NetworkMessage { +pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), /// Publish a message to pubsub mechanism. Publish { topics: Vec, - message: Box>, + message: PubsubMessage, }, } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 215e37e7f..c3271888a 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -123,7 +123,7 @@ impl SimpleSync { /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. - pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { + pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id)); network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); @@ -137,7 +137,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); @@ -156,7 +156,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); @@ -171,7 +171,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); @@ -278,7 +278,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockRootsRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let state = &self.chain.head().beacon_state; @@ -325,7 +325,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, res: BeaconBlockRootsResponse, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -389,7 +389,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockHeadersRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let state = &self.chain.head().beacon_state; @@ -441,7 +441,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, headers: Vec, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -473,7 +473,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockBodiesRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let block_bodies: Vec> = req .block_roots @@ -519,7 +519,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, res: DecodedBeaconBlockBodiesResponse, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -558,7 +558,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block: BeaconBlock, - network: &mut NetworkContext, + network: &mut NetworkContext, ) -> bool { if let Some(outcome) = self.process_block(peer_id.clone(), block.clone(), network, &"gossip") @@ -628,7 +628,7 @@ impl SimpleSync { &mut self, _peer_id: PeerId, msg: Attestation, - _network: &mut NetworkContext, + _network: &mut NetworkContext, ) { match self.chain.process_attestation(msg) { Ok(()) => info!(self.log, "ImportedAttestation"; "source" => "gossip"), @@ -643,7 +643,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockRootsRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { // Potentially set state to sync. if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE { @@ -667,7 +667,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockHeadersRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -684,7 +684,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockBodiesRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -720,7 +720,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block_root: Hash256, - network: &mut NetworkContext, + network: &mut NetworkContext, source: &str, ) -> Option { match self.import_queue.attempt_complete_block(block_root) { @@ -813,7 +813,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block: BeaconBlock, - network: &mut NetworkContext, + network: &mut NetworkContext, source: &str, ) -> Option { let processing_result = self.chain.process_block(block.clone()); @@ -915,9 +915,9 @@ fn hello_message(beacon_chain: &BeaconChain) -> HelloMes let state = &beacon_chain.head().beacon_state; HelloMessage { - //TODO: Correctly define the chain/network id - network_id: spec.chain_id, - chain_id: u64::from(spec.chain_id), + network_id: spec.network_id, + //TODO: Correctly define the chain id + chain_id: spec.network_id as u64, latest_finalized_root: state.finalized_checkpoint.root, latest_finalized_epoch: state.finalized_checkpoint.epoch, best_root: beacon_chain.head().beacon_block_root, diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 20425d292..1f507f0f3 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,6 +1,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PubsubMessage; -use eth2_libp2p::TopicBuilder; +use eth2_libp2p::Topic; use eth2_libp2p::BEACON_ATTESTATION_TOPIC; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; @@ -11,7 +11,7 @@ use protos::services::{ }; use protos::services_grpc::AttestationService; use slog::{error, info, trace, warn}; -use ssz::{ssz_encode, Decode}; +use ssz::{ssz_encode, Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc; use types::Attestation; @@ -19,7 +19,7 @@ use types::Attestation; #[derive(Clone)] pub struct AttestationServiceInstance { pub chain: Arc>, - pub network_chan: mpsc::UnboundedSender>, + pub network_chan: mpsc::UnboundedSender, pub log: slog::Logger, } @@ -144,13 +144,13 @@ impl AttestationService for AttestationServiceInstance { ); // valid attestation, propagate to the network - let topic = TopicBuilder::new(BEACON_ATTESTATION_TOPIC).build(); - let message = PubsubMessage::Attestation(attestation); + let topic = Topic::new(BEACON_ATTESTATION_TOPIC.into()); + let message = PubsubMessage::Attestation(attestation.as_ssz_bytes()); self.network_chan .try_send(NetworkMessage::Publish { topics: vec![topic], - message: Box::new(message), + message: message, }) .unwrap_or_else(|e| { error!( diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index b42bbb208..b1a67399e 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,6 +1,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use eth2_libp2p::BEACON_PUBSUB_TOPIC; -use eth2_libp2p::{PubsubMessage, TopicBuilder}; +use eth2_libp2p::BEACON_BLOCK_TOPIC; +use eth2_libp2p::{PubsubMessage, Topic}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -11,7 +11,7 @@ use protos::services::{ use protos::services_grpc::BeaconBlockService; use slog::Logger; use slog::{error, info, trace, warn}; -use ssz::{ssz_encode, Decode}; +use ssz::{ssz_encode, Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc; use types::{BeaconBlock, Signature, Slot}; @@ -19,7 +19,7 @@ use types::{BeaconBlock, Signature, Slot}; #[derive(Clone)] pub struct BeaconBlockServiceInstance { pub chain: Arc>, - pub network_chan: mpsc::UnboundedSender>, + pub network_chan: mpsc::UnboundedSender, pub log: Logger, } @@ -106,14 +106,14 @@ impl BeaconBlockService for BeaconBlockServiceInstance { ); // get the network topic to send on - let topic = TopicBuilder::new(BEACON_PUBSUB_TOPIC).build(); - let message = PubsubMessage::Block(block); + let topic = Topic::new(BEACON_BLOCK_TOPIC.into()); + let message = PubsubMessage::Block(block.as_ssz_bytes()); // Publish the block to the p2p network via gossipsub. self.network_chan .try_send(NetworkMessage::Publish { topics: vec![topic], - message: Box::new(message), + message: message, }) .unwrap_or_else(|e| { error!( diff --git a/beacon_node/rpc/src/beacon_node.rs b/beacon_node/rpc/src/beacon_node.rs index 631601ac9..5d635c9d1 100644 --- a/beacon_node/rpc/src/beacon_node.rs +++ b/beacon_node/rpc/src/beacon_node.rs @@ -37,7 +37,7 @@ impl BeaconNodeService for BeaconNodeServiceInstance { node_info.set_fork(fork); node_info.set_genesis_time(genesis_time); node_info.set_genesis_slot(spec.genesis_slot.as_u64()); - node_info.set_chain_id(u32::from(spec.chain_id)); + node_info.set_network_id(u32::from(spec.network_id)); // send the node_info the requester let error_log = self.log.clone(); diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index de9039505..eef009292 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc; pub fn start_server( config: &RPCConfig, executor: &TaskExecutor, - network_chan: mpsc::UnboundedSender>, + network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, log: &slog::Logger, ) -> exit_future::Signal { diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index dd0c695b4..b34259f5a 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -4,7 +4,7 @@ use clap::{App, Arg}; use client::{ClientConfig, Eth2Config}; use env_logger::{Builder, Env}; use eth2_config::{read_from_file, write_to_file}; -use slog::{crit, o, Drain, Level}; +use slog::{crit, o, warn, Drain, Level}; use std::fs; use std::path::PathBuf; @@ -52,10 +52,17 @@ fn main() { .arg( Arg::with_name("listen-address") .long("listen-address") - .value_name("Address") + .value_name("ADDRESS") .help("The address lighthouse will listen for UDP and TCP connections. (default 127.0.0.1).") .takes_value(true), ) + .arg( + Arg::with_name("port") + .long("port") + .value_name("PORT") + .help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.") + .takes_value(true), + ) .arg( Arg::with_name("maxpeers") .long("maxpeers") @@ -70,27 +77,34 @@ fn main() { .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.") .takes_value(true), ) - .arg( - Arg::with_name("port") - .long("port") - .value_name("Lighthouse Port") - .help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.") - .takes_value(true), - ) .arg( Arg::with_name("discovery-port") .long("disc-port") - .value_name("DiscoveryPort") + .value_name("PORT") .help("The discovery UDP port.") .takes_value(true), ) .arg( Arg::with_name("discovery-address") .long("discovery-address") - .value_name("Address") + .value_name("ADDRESS") .help("The IP address to broadcast to other peers on how to reach this node.") .takes_value(true), ) + .arg( + Arg::with_name("topics") + .long("topics") + .value_name("STRING") + .help("One or more comma-delimited gossipsub topic strings to subscribe to.") + .takes_value(true), + ) + .arg( + Arg::with_name("libp2p-addresses") + .long("libp2p-addresses") + .value_name("MULTIADDR") + .help("One or more comma-delimited multiaddrs to manually connect to a libp2p peer without an ENR.") + .takes_value(true), + ) /* * gRPC parameters. */ @@ -136,6 +150,7 @@ fn main() { .help("Listen port for the HTTP server.") .takes_value(true), ) + /* Client related arguments */ .arg( Arg::with_name("api") .long("api") @@ -178,12 +193,9 @@ fn main() { .long("default-spec") .value_name("TITLE") .short("default-spec") - .help("Specifies the default eth2 spec to be used. Overridden by any spec loaded - from disk. A spec will be written to disk after this flag is used, so it is - primarily used for creating eth2 spec files.") + .help("Specifies the default eth2 spec to be used. This will override any spec written to disk and will therefore be used by default in future instances.") .takes_value(true) - .possible_values(&["mainnet", "minimal"]) - .default_value("minimal"), + .possible_values(&["mainnet", "minimal", "interop"]) ) .arg( Arg::with_name("recent-genesis") @@ -202,7 +214,7 @@ fn main() { .help("The title of the spec constants for chain config.") .takes_value(true) .possible_values(&["info", "debug", "trace", "warn", "error", "crit"]) - .default_value("info"), + .default_value("trace"), ) .arg( Arg::with_name("verbosity") @@ -301,29 +313,58 @@ fn main() { let eth2_config_path = data_dir.join(ETH2_CONFIG_FILENAME); - // Attempt to load the `Eth2Config` from file. + // Initialise the `Eth2Config`. // - // If the file doesn't exist, create a default one depending on the CLI flags. - let mut eth2_config = match read_from_file::(eth2_config_path.clone()) { - Ok(Some(c)) => c, - Ok(None) => { - let default = match matches.value_of("default-spec") { - Some("mainnet") => Eth2Config::mainnet(), - Some("minimal") => Eth2Config::minimal(), - _ => unreachable!(), // Guarded by slog. - }; - if let Err(e) = write_to_file(eth2_config_path, &default) { - crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); - return; - } - default - } + // If a CLI parameter is set, overwrite any config file present. + // If a parameter is not set, use either the config file present or default to minimal. + let cli_config = match matches.value_of("default-spec") { + Some("mainnet") => Some(Eth2Config::mainnet()), + Some("minimal") => Some(Eth2Config::minimal()), + Some("interop") => Some(Eth2Config::interop()), + _ => None, + }; + // if a CLI flag is specified, write the new config if it doesn't exist, + // otherwise notify the user that the file will not be written. + let eth2_config_from_file = match read_from_file::(eth2_config_path.clone()) { + Ok(config) => config, Err(e) => { - crit!(log, "Failed to load/generate an Eth2Config"; "error" => format!("{:?}", e)); + crit!(log, "Failed to read the Eth2Config from file"; "error" => format!("{:?}", e)); return; } }; + let mut eth2_config = { + if let Some(cli_config) = cli_config { + if eth2_config_from_file.is_none() { + // write to file if one doesn't exist + if let Err(e) = write_to_file(eth2_config_path, &cli_config) { + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + return; + } + } else { + warn!( + log, + "Eth2Config file exists. Configuration file is ignored, using default" + ); + } + cli_config + } else { + // CLI config not specified, read from disk + match eth2_config_from_file { + Some(config) => config, + None => { + // set default to minimal + let eth2_config = Eth2Config::minimal(); + if let Err(e) = write_to_file(eth2_config_path, ð2_config) { + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + return; + } + eth2_config + } + } + } + }; + // Update the eth2 config with any CLI flags. match eth2_config.apply_cli_args(&matches) { Ok(()) => (), @@ -333,6 +374,12 @@ fn main() { } }; + // check to ensure the spec constants between the client and eth2_config match + if eth2_config.spec_constants != client_config.spec_constants { + crit!(log, "Specification constants do not match."; "client_config" => format!("{}", client_config.spec_constants), "eth2_config" => format!("{}", eth2_config.spec_constants)); + return; + } + // Start the node using a `tokio` executor. match run::run_beacon_node(client_config, eth2_config, &log) { Ok(_) => {} diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index 010993988..c16d23e5f 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -13,7 +13,7 @@ use tokio::runtime::Builder; use tokio::runtime::Runtime; use tokio::runtime::TaskExecutor; use tokio_timer::clock::Clock; -use types::{MainnetEthSpec, MinimalEthSpec}; +use types::{InteropEthSpec, MainnetEthSpec, MinimalEthSpec}; /// Reads the configuration and initializes a `BeaconChain` with the required types and parameters. /// @@ -90,6 +90,22 @@ pub fn run_beacon_node( runtime, log, ), + ("disk", "interop") => run::>( + &db_path, + client_config, + eth2_config, + executor, + runtime, + log, + ), + ("memory", "interop") => run::>( + &db_path, + client_config, + eth2_config, + executor, + runtime, + log, + ), (db_type, spec) => { error!(log, "Unknown runtime configuration"; "spec_constants" => spec, "db_type" => db_type); Err("Unknown specification and/or db_type.".into()) diff --git a/eth2/types/src/beacon_state/beacon_state_types.rs b/eth2/types/src/beacon_state/beacon_state_types.rs index 1dc34e195..0e76942dd 100644 --- a/eth2/types/src/beacon_state/beacon_state_types.rs +++ b/eth2/types/src/beacon_state/beacon_state_types.rs @@ -200,3 +200,37 @@ impl EthSpec for MinimalEthSpec { } pub type MinimalBeaconState = BeaconState; + +/// Interop testnet spec +#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)] +pub struct InteropEthSpec; + +impl EthSpec for InteropEthSpec { + type ShardCount = U8; + type SlotsPerEpoch = U8; + type SlotsPerHistoricalRoot = U64; + type SlotsPerEth1VotingPeriod = U16; + type EpochsPerHistoricalVector = U64; + type EpochsPerSlashingsVector = U64; + type MaxPendingAttestations = U1024; // 128 max attestations * 8 slots per epoch + + params_from_eth_spec!(MainnetEthSpec { + JustificationBitsLength, + MaxValidatorsPerCommittee, + GenesisEpoch, + HistoricalRootsLimit, + ValidatorRegistryLimit, + MaxProposerSlashings, + MaxAttesterSlashings, + MaxAttestations, + MaxDeposits, + MaxVoluntaryExits, + MaxTransfers + }); + + fn default_spec() -> ChainSpec { + ChainSpec::interop() + } +} + +pub type InteropBeaconState = BeaconState; diff --git a/eth2/types/src/chain_spec.rs b/eth2/types/src/chain_spec.rs index 2128c6ef1..9dec626d4 100644 --- a/eth2/types/src/chain_spec.rs +++ b/eth2/types/src/chain_spec.rs @@ -92,7 +92,7 @@ pub struct ChainSpec { domain_transfer: u32, pub boot_nodes: Vec, - pub chain_id: u8, + pub network_id: u8, } impl ChainSpec { @@ -190,7 +190,7 @@ impl ChainSpec { * Network specific */ boot_nodes: vec![], - chain_id: 1, // mainnet chain id + network_id: 1, // mainnet network id } } @@ -208,7 +208,23 @@ impl ChainSpec { shuffle_round_count: 10, min_genesis_active_validator_count: 64, max_epochs_per_crosslink: 4, - chain_id: 2, // lighthouse testnet chain id + network_id: 2, // lighthouse testnet network id + boot_nodes, + ..ChainSpec::mainnet() + } + } + + /// Interop testing spec + /// + /// This allows us to customize a chain spec for interop testing. + pub fn interop() -> Self { + let boot_nodes = vec![]; + + Self { + seconds_per_slot: 12, + target_committee_size: 4, + shuffle_round_count: 10, + network_id: 13, boot_nodes, ..ChainSpec::mainnet() } diff --git a/eth2/utils/eth2_config/src/lib.rs b/eth2/utils/eth2_config/src/lib.rs index 17cbc4211..794a27e4e 100644 --- a/eth2/utils/eth2_config/src/lib.rs +++ b/eth2/utils/eth2_config/src/lib.rs @@ -37,6 +37,13 @@ impl Eth2Config { spec: ChainSpec::minimal(), } } + + pub fn interop() -> Self { + Self { + spec_constants: "interop".to_string(), + spec: ChainSpec::interop(), + } + } } impl Eth2Config { diff --git a/protos/src/services.proto b/protos/src/services.proto index bf23ff391..ba0462bbe 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -45,7 +45,7 @@ service AttestationService { message NodeInfoResponse { string version = 1; Fork fork = 2; - uint32 chain_id = 3; + uint32 network_id = 3; uint64 genesis_time = 4; uint64 genesis_slot = 5; } diff --git a/validator_client/eth2_config.toml b/validator_client/eth2_config.toml deleted file mode 100644 index 1e0781378..000000000 --- a/validator_client/eth2_config.toml +++ /dev/null @@ -1,47 +0,0 @@ -spec_constants = "minimal" - -[spec] -target_committee_size = 4 -max_indices_per_attestation = 4096 -min_per_epoch_churn_limit = 4 -churn_limit_quotient = 65536 -base_rewards_per_epoch = 5 -shuffle_round_count = 10 -deposit_contract_tree_depth = 32 -min_deposit_amount = 1000000000 -max_effective_balance = 32000000000 -ejection_balance = 16000000000 -effective_balance_increment = 1000000000 -genesis_slot = 0 -zero_hash = "0x0000000000000000000000000000000000000000000000000000000000000000" -bls_withdrawal_prefix_byte = "0x00" -genesis_time = 4294967295 -seconds_per_slot = 6 -min_attestation_inclusion_delay = 2 -min_seed_lookahead = 1 -activation_exit_delay = 4 -slots_per_eth1_voting_period = 16 -slots_per_historical_root = 8192 -min_validator_withdrawability_delay = 256 -persistent_committee_period = 2048 -max_crosslink_epochs = 64 -min_epochs_to_inactivity_penalty = 4 -base_reward_quotient = 32 -whistleblowing_reward_quotient = 512 -proposer_reward_quotient = 8 -inactivity_penalty_quotient = 33554432 -min_slashing_penalty_quotient = 32 -max_proposer_slashings = 16 -max_attester_slashings = 1 -max_attestations = 128 -max_deposits = 16 -max_voluntary_exits = 16 -max_transfers = 0 -domain_beacon_proposer = 0 -domain_randao = 1 -domain_attestation = 2 -domain_deposit = 3 -domain_voluntary_exit = 4 -domain_transfer = 5 -boot_nodes = ["/ip4/127.0.0.1/tcp/9000"] -chain_id = 2 diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index bd3919b5a..83a874df7 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -11,10 +11,10 @@ use crate::service::Service as ValidatorService; use clap::{App, Arg}; use eth2_config::{read_from_file, write_to_file, Eth2Config}; use protos::services_grpc::ValidatorServiceClient; -use slog::{crit, error, info, o, Drain, Level}; +use slog::{crit, error, info, o, warn, Drain, Level}; use std::fs; use std::path::PathBuf; -use types::{Keypair, MainnetEthSpec, MinimalEthSpec}; +use types::{InteropEthSpec, Keypair, MainnetEthSpec, MinimalEthSpec}; pub const DEFAULT_SPEC: &str = "minimal"; pub const DEFAULT_DATA_DIR: &str = ".lighthouse-validator"; @@ -64,14 +64,13 @@ fn main() { .takes_value(true), ) .arg( - Arg::with_name("spec-constants") - .long("spec-constants") + Arg::with_name("default-spec") + .long("default-spec") .value_name("TITLE") - .short("s") - .help("The title of the spec constants for chain config.") + .short("default-spec") + .help("Specifies the default eth2 spec to be used. This will override any spec written to disk and will therefore be used by default in future instances.") .takes_value(true) - .possible_values(&["mainnet", "minimal"]) - .default_value("minimal"), + .possible_values(&["mainnet", "minimal", "interop"]) ) .arg( Arg::with_name("debug-level") @@ -126,7 +125,7 @@ fn main() { let client_config_path = data_dir.join(CLIENT_CONFIG_FILENAME); - // Attempt to lead the `ClientConfig` from disk. + // Attempt to load the `ClientConfig` from disk. // // If file doesn't exist, create a new, default one. let mut client_config = match read_from_file::( @@ -164,29 +163,58 @@ fn main() { .and_then(|s| Some(PathBuf::from(s))) .unwrap_or_else(|| data_dir.join(ETH2_CONFIG_FILENAME)); - // Attempt to load the `Eth2Config` from file. + // Initialise the `Eth2Config`. // - // If the file doesn't exist, create a default one depending on the CLI flags. - let mut eth2_config = match read_from_file::(eth2_config_path.clone()) { - Ok(Some(c)) => c, - Ok(None) => { - let default = match matches.value_of("spec-constants") { - Some("mainnet") => Eth2Config::mainnet(), - Some("minimal") => Eth2Config::minimal(), - _ => unreachable!(), // Guarded by slog. - }; - if let Err(e) = write_to_file(eth2_config_path, &default) { - crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); - return; - } - default - } + // If a CLI parameter is set, overwrite any config file present. + // If a parameter is not set, use either the config file present or default to minimal. + let cli_config = match matches.value_of("default-spec") { + Some("mainnet") => Some(Eth2Config::mainnet()), + Some("minimal") => Some(Eth2Config::minimal()), + Some("interop") => Some(Eth2Config::interop()), + _ => None, + }; + // if a CLI flag is specified, write the new config if it doesn't exist, + // otherwise notify the user that the file will not be written. + let eth2_config_from_file = match read_from_file::(eth2_config_path.clone()) { + Ok(config) => config, Err(e) => { - crit!(log, "Failed to instantiate an Eth2Config"; "error" => format!("{:?}", e)); + crit!(log, "Failed to read the Eth2Config from file"; "error" => format!("{:?}", e)); return; } }; + let mut eth2_config = { + if let Some(cli_config) = cli_config { + if eth2_config_from_file.is_none() { + // write to file if one doesn't exist + if let Err(e) = write_to_file(eth2_config_path, &cli_config) { + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + return; + } + } else { + warn!( + log, + "Eth2Config file exists. Configuration file is ignored, using default" + ); + } + cli_config + } else { + // CLI config not specified, read from disk + match eth2_config_from_file { + Some(config) => config, + None => { + // set default to minimal + let eth2_config = Eth2Config::minimal(); + if let Err(e) = write_to_file(eth2_config_path, ð2_config) { + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + return; + } + eth2_config + } + } + } + }; + // Update the eth2 config with any CLI flags. match eth2_config.apply_cli_args(&matches) { Ok(()) => (), @@ -214,6 +242,11 @@ fn main() { eth2_config, log.clone(), ), + "interop" => ValidatorService::::start( + client_config, + eth2_config, + log.clone(), + ), other => { crit!(log, "Unknown spec constants"; "title" => other); return; diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 3f99efe36..3ddb96e4c 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -23,7 +23,7 @@ use protos::services_grpc::{ AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, ValidatorServiceClient, }; -use slog::{error, info, warn}; +use slog::{crit, error, info, warn}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::marker::PhantomData; use std::sync::Arc; @@ -37,7 +37,7 @@ use types::{ChainSpec, Epoch, EthSpec, Fork, Slot}; /// A fixed amount of time after a slot to perform operations. This gives the node time to complete /// per-slot processes. -const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(200); +const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); /// The validator service. This is the main thread that executes and maintains validator /// duties. @@ -106,13 +106,13 @@ impl Service Service node_info.version.clone(), "Chain ID" => node_info.chain_id, "Genesis time" => genesis_time); + info!(log,"Beacon node connected"; "Node Version" => node_info.version.clone(), "Chain ID" => node_info.network_id, "Genesis time" => genesis_time); let proto_fork = node_info.get_fork(); let mut previous_version: [u8; 4] = [0; 4]; @@ -303,12 +303,16 @@ impl Service self.current_slot, - "The Timer should poll a new slot" - ); + // this is a non-fatal error. If the slot clock repeats, the node could + // have been slow to process the previous slot and is now duplicating tasks. + // We ignore duplicated but raise a critical error. + if current_slot <= self.current_slot { + crit!( + self.log, + "The validator tried to duplicate a slot. Likely missed the previous slot" + ); + return Err("Duplicate slot".into()); + } self.current_slot = current_slot; info!(self.log, "Processing"; "slot" => current_slot.as_u64(), "epoch" => current_epoch.as_u64()); Ok(()) From 989e2727d76a12cd4defbd2043f99d173026b25c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 10 Aug 2019 17:15:15 +1000 Subject: [PATCH 2/2] Changes to rest_api (#480) * Add half-finished rest api changes * Add basic, messy changes to rest api * Fix expect() in ApiRequest * Remove ApiRequest, add route for beacon state * Tidy rest api, add get state from root * Add api method for getting state roots by slot * Add test for URL helper * Simplify state_at_slot fn * Add tests for rest api helper fns * Add extra tests for parse root * Fix clippy lints * Fix compile error in rest api * Update test to new ethereum-types --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/client/src/config.rs | 4 +- beacon_node/rest_api/Cargo.toml | 5 +- beacon_node/rest_api/src/beacon.rs | 60 +++++++ beacon_node/rest_api/src/beacon_node.rs | 65 -------- beacon_node/rest_api/src/helpers.rs | 156 +++++++++++++++++++ beacon_node/rest_api/src/lib.rs | 142 ++++++++++------- beacon_node/rest_api/src/macros.rs | 23 --- beacon_node/rest_api/src/node.rs | 25 +++ beacon_node/rest_api/src/url_query.rs | 112 +++++++++++++ beacon_node/src/main.rs | 1 - beacon_node/store/src/iter.rs | 9 ++ 12 files changed, 451 insertions(+), 153 deletions(-) create mode 100644 beacon_node/rest_api/src/beacon.rs delete mode 100644 beacon_node/rest_api/src/beacon_node.rs create mode 100644 beacon_node/rest_api/src/helpers.rs delete mode 100644 beacon_node/rest_api/src/macros.rs create mode 100644 beacon_node/rest_api/src/node.rs create mode 100644 beacon_node/rest_api/src/url_query.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 34902b215..28ba5fe48 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -296,7 +296,7 @@ impl BeaconChain { /// It is important to note that the `beacon_state` returned may not match the present slot. It /// is the state as it was when the head block was received, which could be some slots prior to /// now. - pub fn head(&self) -> RwLockReadGuard> { + pub fn head<'a>(&'a self) -> RwLockReadGuard<'a, CheckPoint> { self.canonical_head.read() } diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 176625d77..ee62b6281 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -26,7 +26,7 @@ pub struct Config { pub network: network::NetworkConfig, pub rpc: rpc::RPCConfig, pub http: HttpServerConfig, - pub rest_api: rest_api::APIConfig, + pub rest_api: rest_api::ApiConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -60,7 +60,7 @@ impl Default for Config { network: NetworkConfig::new(), rpc: rpc::RPCConfig::default(), http: HttpServerConfig::default(), - rest_api: rest_api::APIConfig::default(), + rest_api: rest_api::ApiConfig::default(), spec_constants: TESTNET_SPEC_CONSTANTS.into(), genesis_state: GenesisState::RecentGenesis { validator_count: TESTNET_VALIDATOR_COUNT, diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index 7a63ca036..fb6cb8413 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -7,16 +7,19 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] beacon_chain = { path = "../beacon_chain" } +store = { path = "../store" } version = { path = "../version" } serde = { version = "1.0", features = ["derive"] } serde_json = "^1.0" slog = "^2.2.3" slog-term = "^2.4.0" slog-async = "^2.3.0" +state_processing = { path = "../../eth2/state_processing" } +types = { path = "../../eth2/types" } clap = "2.32.0" http = "^0.1.17" hyper = "0.12.32" -hyper-router = "^0.5" futures = "0.1" exit-future = "0.1.3" tokio = "0.1.17" +url = "2.0" diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs new file mode 100644 index 000000000..cef23abe8 --- /dev/null +++ b/beacon_node/rest_api/src/beacon.rs @@ -0,0 +1,60 @@ +use super::{success_response, ApiResult}; +use crate::{helpers::*, ApiError, UrlQuery}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use hyper::{Body, Request}; +use std::sync::Arc; +use store::Store; +use types::BeaconState; + +/// HTTP handler to return a `BeaconState` at a given `root` or `slot`. +/// +/// Will not return a state if the request slot is in the future. Will return states higher than +/// the current head by skipping slots. +pub fn get_state(req: Request) -> ApiResult { + let beacon_chain = req + .extensions() + .get::>>() + .ok_or_else(|| ApiError::ServerError("Beacon chain extension missing".to_string()))?; + + let query_params = ["root", "slot"]; + let (key, value) = UrlQuery::from_request(&req)?.first_of(&query_params)?; + + let state: BeaconState = match (key.as_ref(), value) { + ("slot", value) => state_at_slot(&beacon_chain, parse_slot(&value)?)?, + ("root", value) => { + let root = &parse_root(&value)?; + + beacon_chain + .store + .get(root)? + .ok_or_else(|| ApiError::NotFound(format!("No state for root: {}", root)))? + } + _ => unreachable!("Guarded by UrlQuery::from_request()"), + }; + + let json: String = serde_json::to_string(&state) + .map_err(|e| ApiError::ServerError(format!("Unable to serialize BeaconState: {:?}", e)))?; + + Ok(success_response(Body::from(json))) +} + +/// HTTP handler to return a `BeaconState` root at a given or `slot`. +/// +/// Will not return a state if the request slot is in the future. Will return states higher than +/// the current head by skipping slots. +pub fn get_state_root(req: Request) -> ApiResult { + let beacon_chain = req + .extensions() + .get::>>() + .ok_or_else(|| ApiError::ServerError("Beacon chain extension missing".to_string()))?; + + let slot_string = UrlQuery::from_request(&req)?.only_one("slot")?; + let slot = parse_slot(&slot_string)?; + + let root = state_root_at_slot(&beacon_chain, slot)?; + + let json: String = serde_json::to_string(&root) + .map_err(|e| ApiError::ServerError(format!("Unable to serialize root: {:?}", e)))?; + + Ok(success_response(Body::from(json))) +} diff --git a/beacon_node/rest_api/src/beacon_node.rs b/beacon_node/rest_api/src/beacon_node.rs deleted file mode 100644 index bd8d98a53..000000000 --- a/beacon_node/rest_api/src/beacon_node.rs +++ /dev/null @@ -1,65 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use serde::Serialize; -use slog::info; -use std::sync::Arc; -use version; - -use super::{path_from_request, success_response, APIResult, APIService}; - -use hyper::{Body, Request, Response}; -use hyper_router::{Route, RouterBuilder}; - -#[derive(Clone)] -pub struct BeaconNodeServiceInstance { - pub marker: std::marker::PhantomData, -} - -/// A string which uniquely identifies the client implementation and its version; similar to [HTTP User-Agent](https://tools.ietf.org/html/rfc7231#section-5.5.3). -#[derive(Serialize)] -pub struct Version(String); -impl From for Version { - fn from(x: String) -> Self { - Version(x) - } -} - -/// The genesis_time configured for the beacon node, which is the unix time at which the Eth2.0 chain began. -#[derive(Serialize)] -pub struct GenesisTime(u64); -impl From for GenesisTime { - fn from(x: u64) -> Self { - GenesisTime(x) - } -} - -impl APIService for BeaconNodeServiceInstance { - fn add_routes(&mut self, router_builder: RouterBuilder) -> Result { - let router_builder = router_builder - .add(Route::get("/version").using(result_to_response!(get_version))) - .add(Route::get("/genesis_time").using(result_to_response!(get_genesis_time::))); - Ok(router_builder) - } -} - -/// Read the version string from the current Lighthouse build. -fn get_version(_req: Request) -> APIResult { - let ver = Version::from(version::version()); - let body = Body::from( - serde_json::to_string(&ver).expect("Version should always be serialializable as JSON."), - ); - Ok(success_response(body)) -} - -/// Read the genesis time from the current beacon chain state. -fn get_genesis_time(req: Request) -> APIResult { - let beacon_chain = req.extensions().get::>>().unwrap(); - let gen_time = { - let state = &beacon_chain.head().beacon_state; - state.genesis_time - }; - let body = Body::from( - serde_json::to_string(&gen_time) - .expect("Genesis should time always have a valid JSON serialization."), - ); - Ok(success_response(body)) -} diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs new file mode 100644 index 000000000..2a429076c --- /dev/null +++ b/beacon_node/rest_api/src/helpers.rs @@ -0,0 +1,156 @@ +use crate::ApiError; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use store::{iter::AncestorIter, Store}; +use types::{BeaconState, EthSpec, Hash256, RelativeEpoch, Slot}; + +/// Parse a slot from a `0x` preixed string. +/// +/// E.g., `"1234"` +pub fn parse_slot(string: &str) -> Result { + string + .parse::() + .map(Slot::from) + .map_err(|e| ApiError::InvalidQueryParams(format!("Unable to parse slot: {:?}", e))) +} + +/// Parse a root from a `0x` preixed string. +/// +/// E.g., `"0x0000000000000000000000000000000000000000000000000000000000000000"` +pub fn parse_root(string: &str) -> Result { + const PREFIX: &str = "0x"; + + if string.starts_with(PREFIX) { + let trimmed = string.trim_start_matches(PREFIX); + trimmed + .parse() + .map_err(|e| ApiError::InvalidQueryParams(format!("Unable to parse root: {:?}", e))) + } else { + Err(ApiError::InvalidQueryParams( + "Root must have a '0x' prefix".to_string(), + )) + } +} + +/// Returns a `BeaconState` in the canonical chain of `beacon_chain` at the given `slot`, if +/// possible. +/// +/// Will not return a state if the request slot is in the future. Will return states higher than +/// the current head by skipping slots. +pub fn state_at_slot( + beacon_chain: &BeaconChain, + slot: Slot, +) -> Result, ApiError> { + let head_state = &beacon_chain.head().beacon_state; + + if head_state.slot == slot { + // The request slot is the same as the best block (head) slot. + + // I'm not sure if this `.clone()` will be optimized out. If not, it seems unnecessary. + Ok(beacon_chain.head().beacon_state.clone()) + } else { + let root = state_root_at_slot(beacon_chain, slot)?; + + let state: BeaconState = beacon_chain + .store + .get(&root)? + .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at root {}", root)))?; + + Ok(state) + } +} + +/// Returns the root of the `BeaconState` in the canonical chain of `beacon_chain` at the given +/// `slot`, if possible. +/// +/// Will not return a state root if the request slot is in the future. Will return state roots +/// higher than the current head by skipping slots. +pub fn state_root_at_slot( + beacon_chain: &BeaconChain, + slot: Slot, +) -> Result { + let head_state = &beacon_chain.head().beacon_state; + let current_slot = beacon_chain + .read_slot_clock() + .ok_or_else(|| ApiError::ServerError("Unable to read slot clock".to_string()))?; + + // There are four scenarios when obtaining a state for a given slot: + // + // 1. The request slot is in the future. + // 2. The request slot is the same as the best block (head) slot. + // 3. The request slot is prior to the head slot. + // 4. The request slot is later than the head slot. + if current_slot < slot { + // 1. The request slot is in the future. Reject the request. + // + // We could actually speculate about future state roots by skipping slots, however that's + // likely to cause confusion for API users. + Err(ApiError::InvalidQueryParams(format!( + "Requested slot {} is past the current slot {}", + slot, current_slot + ))) + } else if head_state.slot == slot { + // 2. The request slot is the same as the best block (head) slot. + // + // The head state root is stored in memory, return a reference. + Ok(beacon_chain.head().beacon_state_root) + } else if head_state.slot > slot { + // 3. The request slot is prior to the head slot. + // + // Iterate through the state roots on the head state to find the root for that + // slot. Once the root is found, load it from the database. + Ok(head_state + .try_iter_ancestor_roots(beacon_chain.store.clone()) + .ok_or_else(|| ApiError::ServerError("Failed to create roots iterator".to_string()))? + .find(|(_root, s)| *s == slot) + .map(|(root, _slot)| root) + .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot)))?) + } else { + // 4. The request slot is later than the head slot. + // + // Use `per_slot_processing` to advance the head state to the present slot, + // assuming that all slots do not contain a block (i.e., they are skipped slots). + let mut state = beacon_chain.head().beacon_state.clone(); + let spec = &T::EthSpec::default_spec(); + + for _ in state.slot.as_u64()..slot.as_u64() { + // Ensure the next epoch state caches are built in case of an epoch transition. + state.build_committee_cache(RelativeEpoch::Next, spec)?; + + state_processing::per_slot_processing(&mut state, spec)?; + } + + // Note: this is an expensive operation. Once the tree hash cache is implement it may be + // used here. + Ok(state.canonical_root()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn parse_root_works() { + assert_eq!( + parse_root("0x0000000000000000000000000000000000000000000000000000000000000000"), + Ok(Hash256::zero()) + ); + assert_eq!( + parse_root("0x000000000000000000000000000000000000000000000000000000000000002a"), + Ok(Hash256::from_low_u64_be(42)) + ); + assert!( + parse_root("0000000000000000000000000000000000000000000000000000000000000042").is_err() + ); + assert!(parse_root("0x").is_err()); + assert!(parse_root("0x00").is_err()); + } + + #[test] + fn parse_slot_works() { + assert_eq!(parse_slot("0"), Ok(Slot::new(0))); + assert_eq!(parse_slot("42"), Ok(Slot::new(42))); + assert_eq!(parse_slot("10000000"), Ok(Slot::new(10_000_000))); + assert!(parse_slot("cats").is_err()); + } +} diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 0f7849449..a94a8cdf4 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -1,37 +1,42 @@ extern crate futures; extern crate hyper; -#[macro_use] -mod macros; -mod beacon_node; -pub mod config; +mod beacon; +mod config; +mod helpers; +mod node; +mod url_query; use beacon_chain::{BeaconChain, BeaconChainTypes}; -pub use config::Config as APIConfig; - +pub use config::Config as ApiConfig; +use hyper::rt::Future; +use hyper::service::service_fn_ok; +use hyper::{Body, Method, Response, Server, StatusCode}; use slog::{info, o, warn}; use std::sync::Arc; use tokio::runtime::TaskExecutor; +use url_query::UrlQuery; -use crate::beacon_node::BeaconNodeServiceInstance; -use hyper::rt::Future; -use hyper::service::{service_fn, Service}; -use hyper::{Body, Request, Response, Server, StatusCode}; -use hyper_router::{RouterBuilder, RouterService}; - -pub enum APIError { - MethodNotAllowed { desc: String }, - ServerError { desc: String }, - NotImplemented { desc: String }, +#[derive(PartialEq, Debug)] +pub enum ApiError { + MethodNotAllowed(String), + ServerError(String), + NotImplemented(String), + InvalidQueryParams(String), + NotFound(String), + ImATeapot(String), // Just in case. } -pub type APIResult = Result, APIError>; +pub type ApiResult = Result, ApiError>; -impl Into> for APIError { +impl Into> for ApiError { fn into(self) -> Response { let status_code: (StatusCode, String) = match self { - APIError::MethodNotAllowed { desc } => (StatusCode::METHOD_NOT_ALLOWED, desc), - APIError::ServerError { desc } => (StatusCode::INTERNAL_SERVER_ERROR, desc), - APIError::NotImplemented { desc } => (StatusCode::NOT_IMPLEMENTED, desc), + ApiError::MethodNotAllowed(desc) => (StatusCode::METHOD_NOT_ALLOWED, desc), + ApiError::ServerError(desc) => (StatusCode::INTERNAL_SERVER_ERROR, desc), + ApiError::NotImplemented(desc) => (StatusCode::NOT_IMPLEMENTED, desc), + ApiError::InvalidQueryParams(desc) => (StatusCode::BAD_REQUEST, desc), + ApiError::NotFound(desc) => (StatusCode::NOT_FOUND, desc), + ApiError::ImATeapot(desc) => (StatusCode::IM_A_TEAPOT, desc), }; Response::builder() .status(status_code.0) @@ -40,17 +45,31 @@ impl Into> for APIError { } } -pub trait APIService { - fn add_routes(&mut self, router_builder: RouterBuilder) -> Result; +impl From for ApiError { + fn from(e: store::Error) -> ApiError { + ApiError::ServerError(format!("Database error: {:?}", e)) + } +} + +impl From for ApiError { + fn from(e: types::BeaconStateError) -> ApiError { + ApiError::ServerError(format!("BeaconState error: {:?}", e)) + } +} + +impl From for ApiError { + fn from(e: state_processing::per_slot_processing::Error) -> ApiError { + ApiError::ServerError(format!("PerSlotProcessing error: {:?}", e)) + } } pub fn start_server( - config: &APIConfig, + config: &ApiConfig, executor: &TaskExecutor, beacon_chain: Arc>, log: &slog::Logger, ) -> Result { - let log = log.new(o!("Service" => "API")); + let log = log.new(o!("Service" => "Api")); // build a channel to kill the HTTP server let (exit_signal, exit) = exit_future::signal(); @@ -68,62 +87,65 @@ pub fn start_server( let server_log = log.clone(); let server_bc = beacon_chain.clone(); - // Create the service closure let service = move || { - //TODO: This router must be moved out of this closure, so it isn't rebuilt for every connection. - let mut router = build_router_service::(); - - // Clone our stateful objects, for use in handler closure - let service_log = server_log.clone(); - let service_bc = server_bc.clone(); + let log = server_log.clone(); + let beacon_chain = server_bc.clone(); // Create a simple handler for the router, inject our stateful objects into the request. - service_fn(move |mut req| { + service_fn_ok(move |mut req| { + req.extensions_mut().insert::(log.clone()); req.extensions_mut() - .insert::(service_log.clone()); - req.extensions_mut() - .insert::>>(service_bc.clone()); - router.call(req) + .insert::>>(beacon_chain.clone()); + + let path = req.uri().path().to_string(); + + // Route the request to the correct handler. + let result = match (req.method(), path.as_ref()) { + (&Method::GET, "/beacon/state") => beacon::get_state::(req), + (&Method::GET, "/beacon/state_root") => beacon::get_state_root::(req), + (&Method::GET, "/node/version") => node::get_version(req), + (&Method::GET, "/node/genesis_time") => node::get_genesis_time::(req), + _ => Err(ApiError::MethodNotAllowed(path.clone())), + }; + + match result { + // Return the `hyper::Response`. + Ok(response) => { + slog::debug!(log, "Request successful: {:?}", path); + response + } + // Map the `ApiError` into `hyper::Response`. + Err(e) => { + slog::debug!(log, "Request failure: {:?}", path); + e.into() + } + } }) }; + let log_clone = log.clone(); let server = Server::bind(&bind_addr) .serve(service) .with_graceful_shutdown(server_exit) .map_err(move |e| { warn!( - log, + log_clone, "API failed to start, Unable to bind"; "address" => format!("{:?}", e) ) }); + info!( + log, + "REST API started"; + "address" => format!("{}", config.listen_address), + "port" => config.port, + ); + executor.spawn(server); Ok(exit_signal) } -fn build_router_service() -> RouterService { - let mut router_builder = RouterBuilder::new(); - - let mut bn_service: BeaconNodeServiceInstance = BeaconNodeServiceInstance { - marker: std::marker::PhantomData, - }; - - router_builder = bn_service - .add_routes(router_builder) - .expect("The routes should always be made."); - - RouterService::new(router_builder.build()) -} - -fn path_from_request(req: &Request) -> String { - req.uri() - .path_and_query() - .as_ref() - .map(|pq| String::from(pq.as_str())) - .unwrap_or(String::new()) -} - fn success_response(body: Body) -> Response { Response::builder() .status(StatusCode::OK) diff --git a/beacon_node/rest_api/src/macros.rs b/beacon_node/rest_api/src/macros.rs deleted file mode 100644 index db9bfd848..000000000 --- a/beacon_node/rest_api/src/macros.rs +++ /dev/null @@ -1,23 +0,0 @@ -macro_rules! result_to_response { - ($handler: path) => { - |req: Request| -> Response { - let log = req - .extensions() - .get::() - .expect("Our logger should be on req.") - .clone(); - let path = path_from_request(&req); - let result = $handler(req); - match result { - Ok(response) => { - info!(log, "Request successful: {:?}", path); - response - } - Err(e) => { - info!(log, "Request failure: {:?}", path); - e.into() - } - } - } - }; -} diff --git a/beacon_node/rest_api/src/node.rs b/beacon_node/rest_api/src/node.rs new file mode 100644 index 000000000..4dbd41229 --- /dev/null +++ b/beacon_node/rest_api/src/node.rs @@ -0,0 +1,25 @@ +use crate::{success_response, ApiResult}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use hyper::{Body, Request}; +use std::sync::Arc; +use version; + +/// Read the version string from the current Lighthouse build. +pub fn get_version(_req: Request) -> ApiResult { + let body = Body::from( + serde_json::to_string(&version::version()) + .expect("Version should always be serialializable as JSON."), + ); + Ok(success_response(body)) +} + +/// Read the genesis time from the current beacon chain state. +pub fn get_genesis_time(req: Request) -> ApiResult { + let beacon_chain = req.extensions().get::>>().unwrap(); + let gen_time: u64 = beacon_chain.head().beacon_state.genesis_time; + let body = Body::from( + serde_json::to_string(&gen_time) + .expect("Genesis should time always have a valid JSON serialization."), + ); + Ok(success_response(body)) +} diff --git a/beacon_node/rest_api/src/url_query.rs b/beacon_node/rest_api/src/url_query.rs new file mode 100644 index 000000000..d65312a9e --- /dev/null +++ b/beacon_node/rest_api/src/url_query.rs @@ -0,0 +1,112 @@ +use crate::ApiError; +use hyper::Request; + +/// Provides handy functions for parsing the query parameters of a URL. +pub struct UrlQuery<'a>(url::form_urlencoded::Parse<'a>); + +impl<'a> UrlQuery<'a> { + /// Instantiate from an existing `Request`. + /// + /// Returns `Err` if `req` does not contain any query parameters. + pub fn from_request(req: &'a Request) -> Result { + let query_str = req.uri().query().ok_or_else(|| { + ApiError::InvalidQueryParams( + "URL query must be valid and contain at least one + key." + .to_string(), + ) + })?; + + Ok(UrlQuery(url::form_urlencoded::parse(query_str.as_bytes()))) + } + + /// Returns the first `(key, value)` pair found where the `key` is in `keys`. + /// + /// If no match is found, an `InvalidQueryParams` error is returned. + pub fn first_of(mut self, keys: &[&str]) -> Result<(String, String), ApiError> { + self.0 + .find(|(key, _value)| keys.contains(&&**key)) + .map(|(key, value)| (key.into_owned(), value.into_owned())) + .ok_or_else(|| { + ApiError::InvalidQueryParams(format!( + "URL query must contain at least one of the following keys: {:?}", + keys + )) + }) + } + + /// Returns the value for `key`, if and only if `key` is the only key present in the query + /// parameters. + pub fn only_one(self, key: &str) -> Result { + let queries: Vec<_> = self + .0 + .map(|(k, v)| (k.into_owned(), v.into_owned())) + .collect(); + + if queries.len() == 1 { + let (first_key, first_value) = &queries[0]; // Must have 0 index if len is 1. + if first_key == key { + Ok(first_value.to_string()) + } else { + Err(ApiError::InvalidQueryParams(format!( + "Only the {} query parameter is supported", + key + ))) + } + } else { + Err(ApiError::InvalidQueryParams(format!( + "Only one query parameter is allowed, {} supplied", + queries.len() + ))) + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn only_one() { + let get_result = |addr: &str, key: &str| -> Result { + UrlQuery(url::Url::parse(addr).unwrap().query_pairs()).only_one(key) + }; + + assert_eq!(get_result("http://cat.io/?a=42", "a"), Ok("42".to_string())); + assert!(get_result("http://cat.io/?a=42", "b").is_err()); + assert!(get_result("http://cat.io/?a=42&b=12", "a").is_err()); + assert!(get_result("http://cat.io/", "").is_err()); + } + + #[test] + fn first_of() { + let url = url::Url::parse("http://lighthouse.io/cats?a=42&b=12&c=100").unwrap(); + let get_query = || UrlQuery(url.query_pairs()); + + assert_eq!( + get_query().first_of(&["a"]), + Ok(("a".to_string(), "42".to_string())) + ); + assert_eq!( + get_query().first_of(&["a", "b", "c"]), + Ok(("a".to_string(), "42".to_string())) + ); + assert_eq!( + get_query().first_of(&["a", "a", "a"]), + Ok(("a".to_string(), "42".to_string())) + ); + assert_eq!( + get_query().first_of(&["a", "b", "c"]), + Ok(("a".to_string(), "42".to_string())) + ); + assert_eq!( + get_query().first_of(&["b", "c"]), + Ok(("b".to_string(), "12".to_string())) + ); + assert_eq!( + get_query().first_of(&["c"]), + Ok(("c".to_string(), "100".to_string())) + ); + assert!(get_query().first_of(&["nothing"]).is_err()); + } +} diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index b34259f5a..2e3ad0691 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -210,7 +210,6 @@ fn main() { Arg::with_name("debug-level") .long("debug-level") .value_name("LEVEL") - .short("s") .help("The title of the spec constants for chain config.") .takes_value(true) .possible_values(&["info", "debug", "trace", "warn", "error", "crit"]) diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index c4e557b2d..4e47fceb2 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -24,6 +24,15 @@ impl<'a, U: Store, E: EthSpec> AncestorIter> for } } +impl<'a, U: Store, E: EthSpec> AncestorIter> for BeaconState { + /// Iterates across all the prior state roots of `self`, starting at the most recent and ending + /// at genesis. + fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { + // The `self.clone()` here is wasteful. + Some(StateRootsIterator::owned(store, self.clone(), self.slot)) + } +} + #[derive(Clone)] pub struct StateRootsIterator<'a, T: EthSpec, U> { store: Arc,