From 107bbdcccd66d4fa4125bc6f5b3f4fec3353032f Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 6 Aug 2019 17:54:38 +1000 Subject: [PATCH] Updates to latest interop branch. - Shifts decoding of objects into message handler. - Updates to latest interop gossipsub. - Adds interop spec constant. --- beacon_node/eth2-libp2p/Cargo.toml | 4 +- beacon_node/eth2-libp2p/src/behaviour.rs | 74 ++++++------- beacon_node/eth2-libp2p/src/config.rs | 2 +- beacon_node/eth2-libp2p/src/lib.rs | 2 +- beacon_node/eth2-libp2p/src/rpc/handler.rs | 17 ++- beacon_node/eth2-libp2p/src/rpc/mod.rs | 12 +-- beacon_node/eth2-libp2p/src/service.rs | 23 ++-- beacon_node/http_server/src/lib.rs | 2 +- beacon_node/network/src/message_handler.rs | 101 ++++++++++++------ beacon_node/network/src/service.rs | 33 +++--- beacon_node/network/src/sync/simple_sync.rs | 38 +++---- beacon_node/rpc/src/attestation.rs | 8 +- beacon_node/rpc/src/beacon_block.rs | 12 +-- beacon_node/rpc/src/lib.rs | 2 +- .../src/beacon_state/beacon_state_types.rs | 24 ++++- validator_client/src/main.rs | 2 +- 16 files changed, 199 insertions(+), 157 deletions(-) diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index f5fe8a877..0ea182bc6 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,8 +7,8 @@ edition = "2018" [dependencies] clap = "2.32.0" #SigP repository -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "35104cca27231b9178e1fea5b3788ea41ba8af76" } -enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "35104cca27231b9178e1fea5b3788ea41ba8af76", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index fcb147949..fc224e91a 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -2,6 +2,7 @@ use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, RPC}; use crate::{error, NetworkConfig}; use crate::{Topic, TopicHash}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC}; use futures::prelude::*; use libp2p::{ core::identity::Keypair, @@ -13,11 +14,10 @@ use libp2p::{ tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{debug, o, trace, warn}; -use ssz::{ssz_encode, Decode, DecodeError, Encode}; +use slog::{debug, o, trace}; +use ssz::{ssz_encode, Encode}; use std::num::NonZeroU32; use std::time::Duration; -use types::{Attestation, BeaconBlock}; /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core @@ -87,23 +87,12 @@ impl NetworkBehaviourEventProcess { trace!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg)); - let pubsub_message = match PubsubMessage::from_ssz_bytes(&gs_msg.data) { - //TODO: Punish peer on error - Err(e) => { - warn!( - self.log, - "Received undecodable message from Peer {:?} error", gs_msg.source; - "error" => format!("{:?}", e) - ); - return; - } - Ok(msg) => msg, - }; + let msg = PubsubMessage::from_topics(&gs_msg.topics, gs_msg.data); self.events.push(BehaviourEvent::GossipMessage { source: gs_msg.source, topics: gs_msg.topics, - message: Box::new(pubsub_message), + message: msg, }); } GossipsubEvent::Subscribed { .. } => {} @@ -225,7 +214,7 @@ pub enum BehaviourEvent { GossipMessage { source: PeerId, topics: Vec, - message: Box, + message: PubsubMessage, }, } @@ -233,41 +222,50 @@ pub enum BehaviourEvent { #[derive(Debug, Clone, PartialEq)] pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. - Block(BeaconBlock), + Block(Vec), /// Gossipsub message providing notification of a new attestation. - Attestation(Attestation), + Attestation(Vec), + /// Gossipsub message from an unknown topic. + Unknown(Vec), +} + +impl PubsubMessage { + /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will + * need to be modified. + * + * Also note that a message can be associated with many topics. As soon as one of the topics is + * known we match. If none of the topics are known we return an unknown state. + */ + fn from_topics(topics: &Vec, data: Vec) -> Self { + for topic in topics { + match topic.as_str() { + BEACON_BLOCK_TOPIC => return PubsubMessage::Block(data), + BEACON_ATTESTATION_TOPIC => return PubsubMessage::Attestation(data), + _ => {} + } + } + PubsubMessage::Unknown(data) + } } -//TODO: Correctly encode/decode enums. Prefixing with integer for now. impl Encode for PubsubMessage { fn is_ssz_fixed_len() -> bool { false } fn ssz_append(&self, buf: &mut Vec) { - let offset = ::ssz_fixed_len() + as Encode>::ssz_fixed_len(); - - let mut encoder = ssz::SszEncoder::container(buf, offset); - match self { - PubsubMessage::Block(block_gossip) => { - encoder.append(&0_u32); - + PubsubMessage::Block(inner) + | PubsubMessage::Attestation(inner) + | PubsubMessage::Unknown(inner) => { // Encode the gossip as a Vec; - encoder.append(&block_gossip.as_ssz_bytes()); - } - PubsubMessage::Attestation(attestation_gossip) => { - encoder.append(&1_u32); - - // Encode the gossip as a Vec; - encoder.append(&attestation_gossip.as_ssz_bytes()); + buf.append(&mut inner.as_ssz_bytes()); } } - - encoder.finalize(); } } +/* impl Decode for PubsubMessage { fn is_ssz_fixed_len() -> bool { false @@ -295,7 +293,9 @@ impl Decode for PubsubMessage { } } } +*/ +/* #[cfg(test)] mod test { use super::*; @@ -313,4 +313,6 @@ mod test { assert_eq!(original, decoded); } + } +*/ diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 44d07795b..ddf14cc04 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -7,7 +7,7 @@ use std::path::PathBuf; use std::time::Duration; /// The beacon node topic string to subscribe to. -pub const BEACON_PUBSUB_TOPIC: &str = "beacon_block"; +pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_ATTESTATION_TOPIC: &str = "beacon_attestation"; pub const SHARD_TOPIC_PREFIX: &str = "shard"; diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index ca6ac3760..54a4f2a99 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -11,7 +11,7 @@ mod service; pub use behaviour::PubsubMessage; pub use config::{ - Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX, + Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, SHARD_TOPIC_PREFIX, }; pub use libp2p::gossipsub::{Topic, TopicHash}; pub use libp2p::multiaddr; diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 76e04d24e..355cc52ee 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -12,16 +12,14 @@ use libp2p::swarm::protocols_handler::{ use smallvec::SmallVec; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; -use types::EthSpec; /// The time (in seconds) before a substream that is awaiting a response times out. pub const RESPONSE_TIMEOUT: u64 = 9; /// Implementation of `ProtocolsHandler` for the RPC protocol. -pub struct RPCHandler +pub struct RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, @@ -56,8 +54,8 @@ where /// After the given duration has elapsed, an inactive connection will shutdown. inactive_timeout: Duration, - /// Phantom EthSpec. - _phantom: PhantomData, + /// Marker to pin the generic stream. + _phantom: PhantomData, } /// An outbound substream is waiting a response from the user. @@ -90,10 +88,9 @@ where }, } -impl RPCHandler +impl RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { pub fn new( listen_protocol: SubstreamProtocol, @@ -145,20 +142,18 @@ where } } -impl Default for RPCHandler +impl Default for RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { fn default() -> Self { RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30)) } } -impl ProtocolsHandler for RPCHandler +impl ProtocolsHandler for RPCHandler where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { type InEvent = RPCEvent; type OutEvent = RPCEvent; diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 5593660ff..756a62e71 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -16,7 +16,6 @@ pub use protocol::{RPCError, RPCProtocol, RPCRequest}; use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; -use types::EthSpec; pub(crate) mod codec; mod handler; @@ -50,16 +49,16 @@ impl RPCEvent { /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. -pub struct RPC { +pub struct RPC { /// Queue of events to processed. events: Vec>, /// Pins the generic substream. - marker: PhantomData<(TSubstream, E)>, + marker: PhantomData<(TSubstream)>, /// Slog logger for RPC behaviour. _log: slog::Logger, } -impl RPC { +impl RPC { pub fn new(log: &slog::Logger) -> Self { let log = log.new(o!("Service" => "Libp2p-RPC")); RPC { @@ -80,12 +79,11 @@ impl RPC { } } -impl NetworkBehaviour for RPC +impl NetworkBehaviour for RPC where TSubstream: AsyncRead + AsyncWrite, - E: EthSpec, { - type ProtocolsHandler = RPCHandler; + type ProtocolsHandler = RPCHandler; type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 5a2fc8d8b..316aa0579 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -4,7 +4,7 @@ use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; use crate::NetworkConfig; use crate::{Topic, TopicHash}; -use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ @@ -21,25 +21,24 @@ use std::fs::File; use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::time::Duration; -use types::EthSpec; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; -type Libp2pBehaviour = Behaviour, E>; +type Libp2pBehaviour = Behaviour>; const NETWORK_KEY_FILENAME: &str = "key"; /// The configuration and state of the libp2p components for the beacon node. -pub struct Service { +pub struct Service { /// The libp2p Swarm handler. //TODO: Make this private - pub swarm: Swarm>, + pub swarm: Swarm, /// This node's PeerId. _local_peer_id: PeerId, /// The libp2p logger handle. pub log: slog::Logger, } -impl Service { +impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { debug!(log, "Network-libp2p Service starting"); @@ -92,7 +91,7 @@ impl Service { //TODO: Handle multiple shard attestations. For now we simply use a separate topic for // attestations topics.push(Topic::new(BEACON_ATTESTATION_TOPIC.into())); - topics.push(Topic::new(BEACON_PUBSUB_TOPIC.into())); + topics.push(Topic::new(BEACON_BLOCK_TOPIC.into())); topics.append( &mut config .topics @@ -121,8 +120,8 @@ impl Service { } } -impl Stream for Service { - type Item = Libp2pEvent; +impl Stream for Service { + type Item = Libp2pEvent; type Error = crate::error::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -136,7 +135,7 @@ impl Stream for Service { topics, message, } => { - trace!(self.log, "Pubsub message received: {:?}", message); + trace!(self.log, "Gossipsub message received"; "Message" => format!("{:?}", message)); return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { source, topics, @@ -196,7 +195,7 @@ fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox) } /// Events that can be obtained from polling the Libp2p Service. -pub enum Libp2pEvent { +pub enum Libp2pEvent { /// An RPC response request has been received on the swarm. RPC(PeerId, RPCEvent), /// Initiated the connection to a new peer. @@ -207,7 +206,7 @@ pub enum Libp2pEvent { PubsubMessage { source: PeerId, topics: Vec, - message: Box>, + message: PubsubMessage, }, } diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index b20e43de8..f1d006a5b 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -76,7 +76,7 @@ pub fn create_iron_http_server( pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, - _network_chan: mpsc::UnboundedSender>, + _network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, db_path: PathBuf, metrics_registry: Registry, diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index eaddce533..72a507ad7 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -14,7 +14,7 @@ use slog::{debug, warn}; use ssz::{Decode, DecodeError}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{BeaconBlockHeader, EthSpec}; +use types::{Attestation, BeaconBlock, BeaconBlockHeader}; /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -23,14 +23,14 @@ pub struct MessageHandler { /// The syncing framework. sync: SimpleSync, /// The context required to send messages to, and process messages from peers. - network_context: NetworkContext, + network_context: NetworkContext, /// The `MessageHandler` logger. log: slog::Logger, } /// Types of messages the handler can receive. #[derive(Debug)] -pub enum HandlerMessage { +pub enum HandlerMessage { /// We have initiated a connection to a new peer. PeerDialed(PeerId), /// Peer has disconnected, @@ -38,17 +38,17 @@ pub enum HandlerMessage { /// An RPC response/request has been received. RPC(PeerId, RPCEvent), /// A gossip message has been received. - PubsubMessage(PeerId, Box>), + PubsubMessage(PeerId, PubsubMessage), } impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn spawn( beacon_chain: Arc>, - network_send: mpsc::UnboundedSender>, + network_send: mpsc::UnboundedSender, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, - ) -> error::Result>> { + ) -> error::Result> { debug!(log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); @@ -78,7 +78,7 @@ impl MessageHandler { } /// Handle all messages incoming from the network service. - fn handle_message(&mut self, message: HandlerMessage) { + fn handle_message(&mut self, message: HandlerMessage) { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { @@ -94,7 +94,7 @@ impl MessageHandler { } // we have received an RPC message request/response HandlerMessage::PubsubMessage(peer_id, gossip) => { - self.handle_gossip(peer_id, *gossip); + self.handle_gossip(peer_id, gossip); } } } @@ -218,6 +218,62 @@ impl MessageHandler { } } + /// Handle various RPC errors + fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { + //TODO: Handle error correctly + warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); + } + + /// Handle RPC messages + fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { + match gossip_message { + PubsubMessage::Block(message) => match self.decode_gossip_block(message) { + Err(e) => { + debug!(self.log, "Invalid Gossiped Beacon Block"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + Ok(block) => { + let _should_forward_on = + self.sync + .on_block_gossip(peer_id, block, &mut self.network_context); + } + }, + PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) { + Err(e) => { + debug!(self.log, "Invalid Gossiped Attestation"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e)); + } + Ok(attestation) => { + self.sync + .on_attestation_gossip(peer_id, attestation, &mut self.network_context) + } + }, + PubsubMessage::Unknown(message) => { + // Received a message from an unknown topic. Ignore for now + debug!(self.log, "Unknown Gossip Message"; "Peer" => format!("{}", peer_id), "Message" => format!("{:?}", message)); + } + } + } + + /* Decoding of blocks and attestations from the network. + * + * TODO: Apply efficient decoding/verification of these objects + */ + + fn decode_gossip_block( + &self, + beacon_block: Vec, + ) -> Result, DecodeError> { + //TODO: Apply verification before decoding. + BeaconBlock::from_ssz_bytes(&beacon_block) + } + + fn decode_gossip_attestation( + &self, + beacon_block: Vec, + ) -> Result, DecodeError> { + //TODO: Apply verification before decoding. + Attestation::from_ssz_bytes(&beacon_block) + } + /// Verifies and decodes the ssz-encoded block bodies received from peers. fn decode_block_bodies( &self, @@ -241,39 +297,18 @@ impl MessageHandler { //TODO: Implement faster header verification before decoding entirely Vec::from_ssz_bytes(&headers_response.headers) } - - /// Handle various RPC errors - fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { - //TODO: Handle error correctly - warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); - } - - /// Handle RPC messages - fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { - match gossip_message { - PubsubMessage::Block(message) => { - let _should_forward_on = - self.sync - .on_block_gossip(peer_id, message, &mut self.network_context); - } - PubsubMessage::Attestation(message) => { - self.sync - .on_attestation_gossip(peer_id, message, &mut self.network_context) - } - } - } } // TODO: RPC Rewrite makes this struct fairly pointless -pub struct NetworkContext { +pub struct NetworkContext { /// The network channel to relay messages to the Network service. - network_send: mpsc::UnboundedSender>, + network_send: mpsc::UnboundedSender, /// The `MessageHandler` logger. log: slog::Logger, } -impl NetworkContext { - pub fn new(network_send: mpsc::UnboundedSender>, log: slog::Logger) -> Self { +impl NetworkContext { + pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { Self { network_send, log } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7a21f7f28..e5ca2a917 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,13 +14,12 @@ use slog::{debug, info, o, trace}; use std::sync::Arc; use tokio::runtime::TaskExecutor; use tokio::sync::{mpsc, oneshot}; -use types::EthSpec; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { - libp2p_service: Arc>>, + libp2p_service: Arc>, _libp2p_exit: oneshot::Sender<()>, - _network_send: mpsc::UnboundedSender>, + _network_send: mpsc::UnboundedSender, _phantom: PhantomData, //message_handler: MessageHandler, //message_handler_send: Sender } @@ -31,9 +30,9 @@ impl Service { config: &NetworkConfig, executor: &TaskExecutor, log: slog::Logger, - ) -> error::Result<(Arc, mpsc::UnboundedSender>)> { + ) -> error::Result<(Arc, mpsc::UnboundedSender)> { // build the network channel - let (network_send, network_recv) = mpsc::unbounded_channel::>(); + let (network_send, network_recv) = mpsc::unbounded_channel::(); // launch message handler thread let message_handler_log = log.new(o!("Service" => "MessageHandler")); let message_handler_send = MessageHandler::spawn( @@ -65,15 +64,15 @@ impl Service { Ok((Arc::new(network_service), network_send)) } - pub fn libp2p_service(&self) -> Arc>> { + pub fn libp2p_service(&self) -> Arc> { self.libp2p_service.clone() } } -fn spawn_service( - libp2p_service: Arc>>, - network_recv: mpsc::UnboundedReceiver>, - message_handler_send: mpsc::UnboundedSender>, +fn spawn_service( + libp2p_service: Arc>, + network_recv: mpsc::UnboundedReceiver, + message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, log: slog::Logger, ) -> error::Result> { @@ -99,10 +98,10 @@ fn spawn_service( } //TODO: Potentially handle channel errors -fn network_service( - libp2p_service: Arc>>, - mut network_recv: mpsc::UnboundedReceiver>, - mut message_handler_send: mpsc::UnboundedSender>, +fn network_service( + libp2p_service: Arc>, + mut network_recv: mpsc::UnboundedReceiver, + mut message_handler_send: mpsc::UnboundedSender, log: slog::Logger, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { @@ -119,7 +118,7 @@ fn network_service( }, NetworkMessage::Publish { topics, message } => { debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); - libp2p_service.lock().swarm.publish(topics, *message); + libp2p_service.lock().swarm.publish(topics, message); } }, Ok(Async::NotReady) => break, @@ -176,14 +175,14 @@ fn network_service( /// Types of messages that the network service can receive. #[derive(Debug)] -pub enum NetworkMessage { +pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), /// Publish a message to pubsub mechanism. Publish { topics: Vec, - message: Box>, + message: PubsubMessage, }, } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 9a9d15503..40a1881dd 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -123,7 +123,7 @@ impl SimpleSync { /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. - pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { + pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id)); network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); @@ -137,7 +137,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); @@ -156,7 +156,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); @@ -171,7 +171,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, hello: HelloMessage, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); @@ -278,7 +278,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockRootsRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -323,7 +323,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, res: BeaconBlockRootsResponse, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -387,7 +387,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockHeadersRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -440,7 +440,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, headers: Vec, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -472,7 +472,7 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, req: BeaconBlockBodiesRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { let block_bodies: Vec> = req .block_roots @@ -518,7 +518,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, res: DecodedBeaconBlockBodiesResponse, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -557,7 +557,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block: BeaconBlock, - network: &mut NetworkContext, + network: &mut NetworkContext, ) -> bool { if let Some(outcome) = self.process_block(peer_id.clone(), block.clone(), network, &"gossip") @@ -627,7 +627,7 @@ impl SimpleSync { &mut self, _peer_id: PeerId, msg: Attestation, - _network: &mut NetworkContext, + _network: &mut NetworkContext, ) { match self.chain.process_attestation(msg) { Ok(()) => info!(self.log, "ImportedAttestation"; "source" => "gossip"), @@ -642,7 +642,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockRootsRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { // Potentially set state to sync. if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE { @@ -666,7 +666,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockHeadersRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -683,7 +683,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, req: BeaconBlockBodiesRequest, - network: &mut NetworkContext, + network: &mut NetworkContext, ) { debug!( self.log, @@ -719,7 +719,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block_root: Hash256, - network: &mut NetworkContext, + network: &mut NetworkContext, source: &str, ) -> Option { match self.import_queue.attempt_complete_block(block_root) { @@ -812,7 +812,7 @@ impl SimpleSync { &mut self, peer_id: PeerId, block: BeaconBlock, - network: &mut NetworkContext, + network: &mut NetworkContext, source: &str, ) -> Option { let processing_result = self.chain.process_block(block.clone()); @@ -917,8 +917,8 @@ fn hello_message(beacon_chain: &BeaconChain) -> HelloMes network_id: spec.network_id, //TODO: Correctly define the chain id chain_id: spec.network_id as u64, - latest_finalized_root: state.finalized_root, - latest_finalized_epoch: state.finalized_epoch, + latest_finalized_root: state.finalized_checkpoint.root, + latest_finalized_epoch: state.finalized_checkpoint.epoch, best_root: beacon_chain.head().beacon_block_root, best_slot: state.slot, } diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index cbbe4de6e..3de3639d8 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -11,7 +11,7 @@ use protos::services::{ }; use protos::services_grpc::AttestationService; use slog::{error, info, trace, warn}; -use ssz::{ssz_encode, Decode}; +use ssz::{ssz_encode, Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc; use types::Attestation; @@ -19,7 +19,7 @@ use types::Attestation; #[derive(Clone)] pub struct AttestationServiceInstance { pub chain: Arc>, - pub network_chan: mpsc::UnboundedSender>, + pub network_chan: mpsc::UnboundedSender, pub log: slog::Logger, } @@ -141,12 +141,12 @@ impl AttestationService for AttestationServiceInstance { // valid attestation, propagate to the network let topic = Topic::new(BEACON_ATTESTATION_TOPIC.into()); - let message = PubsubMessage::Attestation(attestation); + let message = PubsubMessage::Attestation(attestation.as_ssz_bytes()); self.network_chan .try_send(NetworkMessage::Publish { topics: vec![topic], - message: Box::new(message), + message: message, }) .unwrap_or_else(|e| { error!( diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index 2a8ae2c6b..b1a67399e 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,5 +1,5 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use eth2_libp2p::BEACON_PUBSUB_TOPIC; +use eth2_libp2p::BEACON_BLOCK_TOPIC; use eth2_libp2p::{PubsubMessage, Topic}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; @@ -11,7 +11,7 @@ use protos::services::{ use protos::services_grpc::BeaconBlockService; use slog::Logger; use slog::{error, info, trace, warn}; -use ssz::{ssz_encode, Decode}; +use ssz::{ssz_encode, Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc; use types::{BeaconBlock, Signature, Slot}; @@ -19,7 +19,7 @@ use types::{BeaconBlock, Signature, Slot}; #[derive(Clone)] pub struct BeaconBlockServiceInstance { pub chain: Arc>, - pub network_chan: mpsc::UnboundedSender>, + pub network_chan: mpsc::UnboundedSender, pub log: Logger, } @@ -106,14 +106,14 @@ impl BeaconBlockService for BeaconBlockServiceInstance { ); // get the network topic to send on - let topic = Topic::new(BEACON_PUBSUB_TOPIC.into()); - let message = PubsubMessage::Block(block); + let topic = Topic::new(BEACON_BLOCK_TOPIC.into()); + let message = PubsubMessage::Block(block.as_ssz_bytes()); // Publish the block to the p2p network via gossipsub. self.network_chan .try_send(NetworkMessage::Publish { topics: vec![topic], - message: Box::new(message), + message: message, }) .unwrap_or_else(|e| { error!( diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index de9039505..eef009292 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc; pub fn start_server( config: &RPCConfig, executor: &TaskExecutor, - network_chan: mpsc::UnboundedSender>, + network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, log: &slog::Logger, ) -> exit_future::Signal { diff --git a/eth2/types/src/beacon_state/beacon_state_types.rs b/eth2/types/src/beacon_state/beacon_state_types.rs index dd6ca3272..0e76942dd 100644 --- a/eth2/types/src/beacon_state/beacon_state_types.rs +++ b/eth2/types/src/beacon_state/beacon_state_types.rs @@ -207,12 +207,26 @@ pub struct InteropEthSpec; impl EthSpec for InteropEthSpec { type ShardCount = U8; - type SlotsPerHistoricalRoot = U64; - type LatestRandaoMixesLength = U64; - type LatestActiveIndexRootsLength = U64; - type LatestSlashedExitLength = U64; type SlotsPerEpoch = U8; - type GenesisEpoch = U0; + type SlotsPerHistoricalRoot = U64; + type SlotsPerEth1VotingPeriod = U16; + type EpochsPerHistoricalVector = U64; + type EpochsPerSlashingsVector = U64; + type MaxPendingAttestations = U1024; // 128 max attestations * 8 slots per epoch + + params_from_eth_spec!(MainnetEthSpec { + JustificationBitsLength, + MaxValidatorsPerCommittee, + GenesisEpoch, + HistoricalRootsLimit, + ValidatorRegistryLimit, + MaxProposerSlashings, + MaxAttesterSlashings, + MaxAttestations, + MaxDeposits, + MaxVoluntaryExits, + MaxTransfers + }); fn default_spec() -> ChainSpec { ChainSpec::interop() diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 756f82991..76acb2f1a 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -214,7 +214,7 @@ fn main() { eth2_config, log.clone(), ), - "interop" => ValidatorService::::start::( + "interop" => ValidatorService::::start( client_config, eth2_config, log.clone(),