diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index caa5b28e4..59c799105 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,8 +7,8 @@ edition = "2018" [dependencies] clap = "2.32.0" #SigP repository -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "61036890d574f5b46573952b20def2baafd6a6e9" } -enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "61036890d574f5b46573952b20def2baafd6a6e9", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "76f7475e4b7063e663ad03c7524cf091f9961968" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "76f7475e4b7063e663ad03c7524cf091f9961968", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 2c574e46a..a47d32ec2 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -15,7 +15,7 @@ use libp2p::{ tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{debug, o, trace}; +use slog::{debug, o}; use std::num::NonZeroU32; use std::time::Duration; @@ -90,13 +90,15 @@ impl NetworkBehaviourEventProcess { - trace!(self.log, "Received GossipEvent"); - + GossipsubEvent::Message(propagation_source, gs_msg) => { + let id = gs_msg.id(); let msg = PubsubMessage::from_topics(&gs_msg.topics, gs_msg.data); + // Note: We are keeping track here of the peer that sent us the message, not the + // peer that originally published the message. self.events.push(BehaviourEvent::GossipMessage { - source: gs_msg.source, + id, + source: propagation_source, topics: gs_msg.topics, message: msg, }); @@ -199,6 +201,13 @@ impl Behaviour { } } + /// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated + /// once validated by the beacon chain. + pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: String) { + self.gossipsub + .propagate_message(&message_id, propagation_source); + } + /* Eth2 RPC behaviour functions */ /// Sends an RPC Request/Response via the RPC protocol. @@ -214,12 +223,21 @@ impl Behaviour { /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { + /// A received RPC event and the peer that it was received from. RPC(PeerId, RPCEvent), + /// We have completed an initial connection to a new peer. PeerDialed(PeerId), + /// A peer has disconnected. PeerDisconnected(PeerId), + /// A gossipsub message has been received. GossipMessage { + /// The gossipsub message id. Used when propagating blocks after validation. + id: String, + /// The peer from which we received this message, not the peer that published it. source: PeerId, + /// The topics that this message was sent on. topics: Vec, + /// The message itself. message: PubsubMessage, }, } diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 7cb501c1f..fd44b99af 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -74,7 +74,8 @@ impl Default for Config { // parameter. gs_config: GossipsubConfigBuilder::new() .max_transmit_size(1_048_576) - .heartbeat_interval(Duration::from_secs(20)) + .heartbeat_interval(Duration::from_secs(20)) // TODO: Reduce for mainnet + .propagate_messages(false) // require validation before propagation .build(), boot_nodes: vec![], libp2p_nodes: vec![], diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index c3f2522d8..69ca39ad7 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -169,6 +169,7 @@ where fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { self.connected_peers.insert(peer_id); + // TODO: Drop peers if over max_peer limit metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); metrics::set_gauge(&metrics::PEERS_CONNECTED, self.connected_peers() as i64); diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 96b5a276e..9f08b1eda 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -157,16 +157,16 @@ impl Stream for Service { fn poll(&mut self) -> Poll, Self::Error> { loop { match self.swarm.poll() { - //Behaviour events Ok(Async::Ready(Some(event))) => match event { - // TODO: Stub here for debugging BehaviourEvent::GossipMessage { + id, source, topics, message, } => { trace!(self.log, "Gossipsub message received"; "service" => "Swarm"); return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { + id, source, topics, message, @@ -234,6 +234,7 @@ pub enum Libp2pEvent { PeerDisconnected(PeerId), /// Received pubsub message. PubsubMessage { + id: String, source: PeerId, topics: Vec, message: PubsubMessage, diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index c14fc970d..d6e9f8be8 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -21,6 +21,8 @@ pub struct MessageHandler { _chain: Arc>, /// The syncing framework. sync: SimpleSync, + /// A channel to the network service to allow for gossip propagation. + network_send: mpsc::UnboundedSender, /// The `MessageHandler` logger. log: slog::Logger, } @@ -34,8 +36,9 @@ pub enum HandlerMessage { PeerDisconnected(PeerId), /// An RPC response/request has been received. RPC(PeerId, RPCEvent), - /// A gossip message has been received. - PubsubMessage(PeerId, PubsubMessage), + /// A gossip message has been received. The fields are: message id, the peer that sent us this + /// message and the message itself. + PubsubMessage(String, PeerId, PubsubMessage), } impl MessageHandler { @@ -50,12 +53,13 @@ impl MessageHandler { let (handler_send, handler_recv) = mpsc::unbounded_channel(); // Initialise sync and begin processing in thread - let sync = SimpleSync::new(beacon_chain.clone(), network_send, &log); + let sync = SimpleSync::new(beacon_chain.clone(), network_send.clone(), &log); // generate the Message handler let mut handler = MessageHandler { _chain: beacon_chain.clone(), sync, + network_send, log: log.clone(), }; @@ -87,8 +91,8 @@ impl MessageHandler { self.handle_rpc_message(peer_id, rpc_event); } // An RPC message request/response has been received - HandlerMessage::PubsubMessage(peer_id, gossip) => { - self.handle_gossip(peer_id, gossip); + HandlerMessage::PubsubMessage(id, peer_id, gossip) => { + self.handle_gossip(id, peer_id, gossip); } } } @@ -194,24 +198,34 @@ impl MessageHandler { } /// Handle RPC messages - fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { + fn handle_gossip(&mut self, id: String, peer_id: PeerId, gossip_message: PubsubMessage) { match gossip_message { PubsubMessage::Block(message) => match self.decode_gossip_block(message) { Ok(block) => { - let _should_forward_on = self.sync.on_block_gossip(peer_id, block); + let should_forward_on = self.sync.on_block_gossip(peer_id.clone(), block); + // TODO: Apply more sophisticated validation and decoding logic + if should_forward_on { + self.propagate_message(id, peer_id.clone()); + } } Err(e) => { debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); } }, PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) { - Ok(attestation) => self.sync.on_attestation_gossip(peer_id, attestation), + Ok(attestation) => { + // TODO: Apply more sophisticated validation and decoding logic + self.propagate_message(id, peer_id.clone()); + self.sync.on_attestation_gossip(peer_id, attestation); + } Err(e) => { debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); } }, PubsubMessage::VoluntaryExit(message) => match self.decode_gossip_exit(message) { Ok(_exit) => { + // TODO: Apply more sophisticated validation and decoding logic + self.propagate_message(id, peer_id.clone()); // TODO: Handle exits debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) ); } @@ -222,6 +236,8 @@ impl MessageHandler { PubsubMessage::ProposerSlashing(message) => { match self.decode_gossip_proposer_slashing(message) { Ok(_slashing) => { + // TODO: Apply more sophisticated validation and decoding logic + self.propagate_message(id, peer_id.clone()); // TODO: Handle proposer slashings debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) ); } @@ -233,6 +249,8 @@ impl MessageHandler { PubsubMessage::AttesterSlashing(message) => { match self.decode_gossip_attestation_slashing(message) { Ok(_slashing) => { + // TODO: Apply more sophisticated validation and decoding logic + self.propagate_message(id, peer_id.clone()); // TODO: Handle attester slashings debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) ); } @@ -248,6 +266,21 @@ impl MessageHandler { } } + /// Informs the network service that the message should be forwarded to other peers. + fn propagate_message(&mut self, message_id: String, propagation_source: PeerId) { + self.network_send + .try_send(NetworkMessage::Propagate { + propagation_source, + message_id, + }) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send propagation request to the network service" + ) + }); + } + /* Decoding of gossipsub objects from the network. * * The decoding is done in the message handler as it has access to to a `BeaconChain` and can diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ae7562033..f54630615 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -159,12 +159,23 @@ fn network_service( // poll the network channel match network_recv.poll() { Ok(Async::Ready(Some(message))) => match message { - NetworkMessage::Send(peer_id, outgoing_message) => match outgoing_message { - OutgoingMessage::RPC(rpc_event) => { - trace!(log, "{}", rpc_event); - libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); - } - }, + NetworkMessage::RPC(peer_id, rpc_event) => { + trace!(log, "{}", rpc_event); + libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); + } + NetworkMessage::Propagate { + propagation_source, + message_id, + } => { + trace!(log, "Propagating gossipsub message"; + "propagation_peer" => format!("{:?}", propagation_source), + "message_id" => format!("{}", message_id), + ); + libp2p_service + .lock() + .swarm + .propagate_message(&propagation_source, message_id); + } NetworkMessage::Publish { topics, message } => { debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); libp2p_service.lock().swarm.publish(&topics, message); @@ -203,13 +214,14 @@ fn network_service( .map_err(|_| "Failed to send PeerDisconnected to handler")?; } Libp2pEvent::PubsubMessage { - source, message, .. + id, + source, + message, + .. } => { - //TODO: Decide if we need to propagate the topic upwards. (Potentially for - //attestations) message_handler_send - .try_send(HandlerMessage::PubsubMessage(source, message)) - .map_err(|_| " failed to send pubsub message to handler")?; + .try_send(HandlerMessage::PubsubMessage(id, source, message)) + .map_err(|_| "Failed to send pubsub message to handler")?; } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), @@ -225,19 +237,16 @@ fn network_service( /// Types of messages that the network service can receive. #[derive(Debug)] pub enum NetworkMessage { - /// Send a message to libp2p service. - //TODO: Define typing for messages across the wire - Send(PeerId, OutgoingMessage), - /// Publish a message to pubsub mechanism. + /// Send an RPC message to the libp2p service. + RPC(PeerId, RPCEvent), + /// Publish a message to gossipsub. Publish { topics: Vec, message: PubsubMessage, }, -} - -/// Type of outgoing messages that can be sent through the network service. -#[derive(Debug)] -pub enum OutgoingMessage { - /// Send an RPC request/response. - RPC(RPCEvent), + /// Propagate a received gossipsub message + Propagate { + propagation_source: PeerId, + message_id: String, + }, } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 056453a68..9d05b312b 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,5 +1,5 @@ use super::manager::{ImportManager, ImportManagerOutcome}; -use crate::service::{NetworkMessage, OutgoingMessage}; +use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; @@ -466,7 +466,7 @@ impl SimpleSync { SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::BlockIsAlreadyKnown => SHOULD_FORWARD_GOSSIP_BLOCK, - _ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK, + _ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK, //TODO: Decide if we want to forward these } } else { SHOULD_NOT_FORWARD_GOSSIP_BLOCK @@ -558,12 +558,8 @@ impl NetworkContext { } fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.send(peer_id, OutgoingMessage::RPC(rpc_event)) - } - - fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) { self.network_send - .try_send(NetworkMessage::Send(peer_id, outgoing_message)) + .try_send(NetworkMessage::RPC(peer_id, rpc_event)) .unwrap_or_else(|_| { warn!( self.log,