From 52b31b200940850b265ae585d567f3a286f0fae1 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 25 Mar 2019 23:02:51 +1100 Subject: [PATCH] Implement initial pubsub message handling --- beacon_node/eth2-libp2p/src/behaviour.rs | 57 +++++++++++++++------- beacon_node/eth2-libp2p/src/lib.rs | 3 +- beacon_node/eth2-libp2p/src/service.rs | 26 +++++++--- beacon_node/network/src/beacon_chain.rs | 2 +- beacon_node/network/src/message_handler.rs | 12 ++--- beacon_node/network/src/service.rs | 36 ++++++++------ beacon_node/rpc/Cargo.toml | 2 + beacon_node/rpc/src/beacon_block.rs | 6 ++- eth2/types/src/lib.rs | 2 +- 9 files changed, 95 insertions(+), 51 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index b3c4213b1..0229d06d5 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -13,11 +13,11 @@ use libp2p::{ tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{debug, o}; +use slog::{debug, o, warn}; use ssz::{ssz_encode, Decodable, DecodeError, Encodable, SszStream}; use ssz_derive::{Decode, Encode}; use types::Attestation; -use types::Topic; +use types::{Topic, TopicHash}; /// Builds the network behaviour for the libp2p Swarm. /// Implements gossipsub message routing. @@ -48,13 +48,33 @@ impl NetworkBehaviourEventProcess { - let gs_message = String::from_utf8_lossy(&message.data); - // TODO: Remove this type - debug only - self.events - .push(BehaviourEvent::Message(gs_message.to_string())) + GossipsubEvent::Message(gs_msg) => { + let pubsub_message = match PubsubMessage::ssz_decode(&gs_msg.data, 0) { + //TODO: Punish peer on error + Err(e) => { + warn!( + self.log, + "Received undecodable message from Peer {:?}", gs_msg.source + ); + return; + } + Ok((msg, _index)) => msg, + }; + + self.events.push(BehaviourEvent::GossipMessage { + source: gs_msg.source, + topics: gs_msg.topics, + message: pubsub_message, + }); } - _ => {} + GossipsubEvent::Subscribed { + peer_id: _, + topic: _, + } + | GossipsubEvent::Unsubscribed { + peer_id: _, + topic: _, + } => {} } } } @@ -125,15 +145,6 @@ impl Behaviour { } } - /* Behaviour functions */ - - /// Publishes a message on the pubsub (gossipsub) behaviour. - pub fn publish(&mut self, topic: Topic, message: PubsubMessage) { - //encode the message - let message_bytes = ssz_encode(&message); - self.gossipsub.publish(topic, message_bytes); - } - /// Consumes the events list when polled. fn poll( &mut self, @@ -157,6 +168,12 @@ impl Behaviour { pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.serenity_rpc.send_rpc(peer_id, rpc_event); } + + /// Publishes a message on the pubsub (gossipsub) behaviour. + pub fn publish(&mut self, topic: Topic, message: PubsubMessage) { + let message_bytes = ssz_encode(&message); + self.gossipsub.publish(topic, message_bytes); + } } /// The types of events than can be obtained from polling the behaviour. @@ -165,7 +182,11 @@ pub enum BehaviourEvent { PeerDialed(PeerId), Identified(PeerId, IdentifyInfo), // TODO: This is a stub at the moment - Message(String), + GossipMessage { + source: PeerId, + topics: Vec, + message: PubsubMessage, + }, } /// Gossipsub message providing notification of a new block. diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index f7a961bb2..659d6b01c 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -8,12 +8,13 @@ pub mod error; pub mod rpc; mod service; +pub use behaviour::PubsubMessage; pub use config::Config as NetworkConfig; pub use libp2p::{ gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, PeerId, }; -pub use rpc::{HelloMessage, RPCEvent}; +pub use rpc::RPCEvent; pub use service::Libp2pEvent; pub use service::Service; pub use types::multiaddr; diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index e68df2d38..73facee72 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -1,4 +1,4 @@ -use crate::behaviour::{Behaviour, BehaviourEvent}; +use crate::behaviour::{Behaviour, BehaviourEvent, PubsubMessage}; use crate::error; use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; @@ -16,7 +16,7 @@ use libp2p::{core, secio, PeerId, Swarm, Transport}; use slog::{debug, info, trace, warn}; use std::io::{Error, ErrorKind}; use std::time::Duration; -use types::TopicBuilder; +use types::{TopicBuilder, TopicHash}; /// The configuration and state of the libp2p components for the beacon node. pub struct Service { @@ -107,9 +107,17 @@ impl Stream for Service { //Behaviour events Ok(Async::Ready(Some(event))) => match event { // TODO: Stub here for debugging - BehaviourEvent::Message(m) => { - debug!(self.log, "Message received: {}", m); - return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); + BehaviourEvent::GossipMessage { + source, + topics, + message, + } => { + debug!(self.log, "Pubsub message received: {:?}", message); + return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { + source, + topics, + message, + }))); } BehaviourEvent::RPC(peer_id, event) => { return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); @@ -173,6 +181,10 @@ pub enum Libp2pEvent { PeerDialed(PeerId), /// Received information about a peer on the network. Identified(PeerId, IdentifyInfo), - // TODO: Pub-sub testing only. - Message(String), + /// Received pubsub message. + PubsubMessage { + source: PeerId, + topics: Vec, + message: PubsubMessage, + }, } diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index c627912a4..8ec8162ff 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -7,7 +7,7 @@ use beacon_chain::{ types::{BeaconState, ChainSpec}, AggregationOutcome, CheckPoint, }; -use eth2_libp2p::HelloMessage; +use eth2_libp2p::rpc::HelloMessage; use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index dcfee96a0..0efa6b96f 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -4,7 +4,7 @@ use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; use crossbeam_channel::{unbounded as channel, Sender}; use eth2_libp2p::{ - behaviour::IncomingGossip, + behaviour::PubsubMessage, rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId}, PeerId, RPCEvent, }; @@ -41,7 +41,7 @@ pub enum HandlerMessage { /// An RPC response/request has been received. RPC(PeerId, RPCEvent), /// A gossip message has been received. - IncomingGossip(PeerId, IncomingGossip), + PubsubMessage(PeerId, PubsubMessage), } impl MessageHandler { @@ -92,7 +92,7 @@ impl MessageHandler { self.handle_rpc_message(peer_id, rpc_event); } // we have received an RPC message request/response - HandlerMessage::IncomingGossip(peer_id, gossip) => { + HandlerMessage::PubsubMessage(peer_id, gossip) => { self.handle_gossip(peer_id, gossip); } //TODO: Handle all messages @@ -205,13 +205,13 @@ impl MessageHandler { } /// Handle RPC messages - fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: IncomingGossip) { + fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { match gossip_message { - IncomingGossip::Block(message) => { + PubsubMessage::Block(message) => { self.sync .on_block_gossip(peer_id, message, &mut self.network_context) } - IncomingGossip::Attestation(message) => { + PubsubMessage::Attestation(message) => { self.sync .on_attestation_gossip(peer_id, message, &mut self.network_context) } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 33ea79c1a..55c43e4ec 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -3,16 +3,16 @@ use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::NetworkConfig; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; -use eth2_libp2p::{RPCEvent, PublishMessage}; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::{Libp2pEvent, PeerId}; +use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::sync::oneshot; use futures::Stream; use slog::{debug, info, o, trace}; use std::sync::Arc; use tokio::runtime::TaskExecutor; -use types::{BeaconBlock, Topic}; +use types::Topic; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { @@ -100,6 +100,7 @@ fn spawn_service( Ok(network_exit) } +//TODO: Potentially handle channel errors fn network_service( mut libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, @@ -129,10 +130,17 @@ fn network_service( "We have identified peer: {:?} with {:?}", peer_id, info ); } - Libp2pEvent::Message(m) => debug!( - libp2p_service.log, - "Network Service: Message received: {}", m - ), + Libp2pEvent::PubsubMessage { + source, + topics: _, + message, + } => { + //TODO: Decide if we need to propagate the topic upwards. (Potentially for + //attestations) + message_handler_send + .send(HandlerMessage::PubsubMessage(source, message)) + .map_err(|_| " failed to send pubsub message to handler")?; + } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), Ok(Async::NotReady) => break, @@ -157,17 +165,16 @@ fn network_service( } }; } + Ok(NetworkMessage::Publish(topic, message)) => { + debug!(log, "Sending pubsub message on topic {:?}", topic); + libp2p_service.swarm.publish(topic, message); + } Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => { return Err(eth2_libp2p::error::Error::from( "Network channel disconnected", )); - }, - Ok(NetworkMessage::Publish(topic, message) => { - debug!(log, "Sending message on topic {:?}", topic); - libp2p_service.swarm.publish(topic,message) - - + } } } Ok(Async::NotReady) @@ -180,8 +187,8 @@ pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), - /// Publish a message to gossipsub - Publish(Topic, PublishMessage), + /// Publish a message to pubsub mechanism. + Publish(Topic, PubsubMessage), } /// Type of outgoing messages that can be sent through the network service. @@ -192,4 +199,3 @@ pub enum OutgoingMessage { //TODO: Remove NotifierTest, } - diff --git a/beacon_node/rpc/Cargo.toml b/beacon_node/rpc/Cargo.toml index d405982db..e9709c1ce 100644 --- a/beacon_node/rpc/Cargo.toml +++ b/beacon_node/rpc/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] bls = { path = "../../eth2/utils/bls" } beacon_chain = { path = "../beacon_chain" } +network = { path = "../network" } version = { path = "../version" } types = { path = "../../eth2/types" } ssz = { path = "../../eth2/utils/ssz" } @@ -23,3 +24,4 @@ slog-term = "^2.4.0" slog-async = "^2.3.0" tokio = "0.1.17" exit-future = "0.1.4" +crossbeam-channel = "0.3.8" diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index 9169d695d..d124152f1 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -6,6 +6,8 @@ use protos::services::{ }; use protos::services_grpc::BeaconBlockService; use slog::Logger; +use crossbeam_channel; +use network::NetworkMessage; #[derive(Clone)] pub struct BeaconBlockServiceInstance { @@ -48,8 +50,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance { println!("publishing {:?}", block); - // TODO: Build properly - let topic = types::TopicBuilder:: + // TODO: Obtain from the network properly. + let topic = types::TopicBuilder::from("beacon_chain").build(); println!("Sending beacon block to gossipsub"); network_chan.send(NetworkMessage::Publish( diff --git a/eth2/types/src/lib.rs b/eth2/types/src/lib.rs index 953a9508f..118e862e8 100644 --- a/eth2/types/src/lib.rs +++ b/eth2/types/src/lib.rs @@ -85,6 +85,6 @@ pub type AttesterMap = HashMap<(u64, u64), Vec>; pub type ProposerMap = HashMap; pub use bls::{AggregatePublicKey, AggregateSignature, Keypair, PublicKey, SecretKey, Signature}; -pub use libp2p::floodsub::{Topic, TopicBuilder}; +pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; pub use libp2p::multiaddr; pub use libp2p::Multiaddr;