Implement initial pubsub message handling

This commit is contained in:
Age Manning 2019-03-25 23:02:51 +11:00
parent 05369df7e8
commit 52b31b2009
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
9 changed files with 95 additions and 51 deletions

View File

@ -13,11 +13,11 @@ use libp2p::{
tokio_io::{AsyncRead, AsyncWrite}, tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use slog::{debug, o}; use slog::{debug, o, warn};
use ssz::{ssz_encode, Decodable, DecodeError, Encodable, SszStream}; use ssz::{ssz_encode, Decodable, DecodeError, Encodable, SszStream};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use types::Attestation; use types::Attestation;
use types::Topic; use types::{Topic, TopicHash};
/// Builds the network behaviour for the libp2p Swarm. /// Builds the network behaviour for the libp2p Swarm.
/// Implements gossipsub message routing. /// Implements gossipsub message routing.
@ -48,13 +48,33 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
{ {
fn inject_event(&mut self, event: GossipsubEvent) { fn inject_event(&mut self, event: GossipsubEvent) {
match event { match event {
GossipsubEvent::Message(message) => { GossipsubEvent::Message(gs_msg) => {
let gs_message = String::from_utf8_lossy(&message.data); let pubsub_message = match PubsubMessage::ssz_decode(&gs_msg.data, 0) {
// TODO: Remove this type - debug only //TODO: Punish peer on error
self.events Err(e) => {
.push(BehaviourEvent::Message(gs_message.to_string())) warn!(
self.log,
"Received undecodable message from Peer {:?}", gs_msg.source
);
return;
} }
_ => {} Ok((msg, _index)) => msg,
};
self.events.push(BehaviourEvent::GossipMessage {
source: gs_msg.source,
topics: gs_msg.topics,
message: pubsub_message,
});
}
GossipsubEvent::Subscribed {
peer_id: _,
topic: _,
}
| GossipsubEvent::Unsubscribed {
peer_id: _,
topic: _,
} => {}
} }
} }
} }
@ -125,15 +145,6 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
} }
} }
/* Behaviour functions */
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topic: Topic, message: PubsubMessage) {
//encode the message
let message_bytes = ssz_encode(&message);
self.gossipsub.publish(topic, message_bytes);
}
/// Consumes the events list when polled. /// Consumes the events list when polled.
fn poll<TBehaviourIn>( fn poll<TBehaviourIn>(
&mut self, &mut self,
@ -157,6 +168,12 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event); self.serenity_rpc.send_rpc(peer_id, rpc_event);
} }
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topic: Topic, message: PubsubMessage) {
let message_bytes = ssz_encode(&message);
self.gossipsub.publish(topic, message_bytes);
}
} }
/// The types of events than can be obtained from polling the behaviour. /// The types of events than can be obtained from polling the behaviour.
@ -165,7 +182,11 @@ pub enum BehaviourEvent {
PeerDialed(PeerId), PeerDialed(PeerId),
Identified(PeerId, IdentifyInfo), Identified(PeerId, IdentifyInfo),
// TODO: This is a stub at the moment // TODO: This is a stub at the moment
Message(String), GossipMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: PubsubMessage,
},
} }
/// Gossipsub message providing notification of a new block. /// Gossipsub message providing notification of a new block.

View File

@ -8,12 +8,13 @@ pub mod error;
pub mod rpc; pub mod rpc;
mod service; mod service;
pub use behaviour::PubsubMessage;
pub use config::Config as NetworkConfig; pub use config::Config as NetworkConfig;
pub use libp2p::{ pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId, PeerId,
}; };
pub use rpc::{HelloMessage, RPCEvent}; pub use rpc::RPCEvent;
pub use service::Libp2pEvent; pub use service::Libp2pEvent;
pub use service::Service; pub use service::Service;
pub use types::multiaddr; pub use types::multiaddr;

View File

@ -1,4 +1,4 @@
use crate::behaviour::{Behaviour, BehaviourEvent}; use crate::behaviour::{Behaviour, BehaviourEvent, PubsubMessage};
use crate::error; use crate::error;
use crate::multiaddr::Protocol; use crate::multiaddr::Protocol;
use crate::rpc::RPCEvent; use crate::rpc::RPCEvent;
@ -16,7 +16,7 @@ use libp2p::{core, secio, PeerId, Swarm, Transport};
use slog::{debug, info, trace, warn}; use slog::{debug, info, trace, warn};
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
use std::time::Duration; use std::time::Duration;
use types::TopicBuilder; use types::{TopicBuilder, TopicHash};
/// The configuration and state of the libp2p components for the beacon node. /// The configuration and state of the libp2p components for the beacon node.
pub struct Service { pub struct Service {
@ -107,9 +107,17 @@ impl Stream for Service {
//Behaviour events //Behaviour events
Ok(Async::Ready(Some(event))) => match event { Ok(Async::Ready(Some(event))) => match event {
// TODO: Stub here for debugging // TODO: Stub here for debugging
BehaviourEvent::Message(m) => { BehaviourEvent::GossipMessage {
debug!(self.log, "Message received: {}", m); source,
return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); topics,
message,
} => {
debug!(self.log, "Pubsub message received: {:?}", message);
return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage {
source,
topics,
message,
})));
} }
BehaviourEvent::RPC(peer_id, event) => { BehaviourEvent::RPC(peer_id, event) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
@ -173,6 +181,10 @@ pub enum Libp2pEvent {
PeerDialed(PeerId), PeerDialed(PeerId),
/// Received information about a peer on the network. /// Received information about a peer on the network.
Identified(PeerId, IdentifyInfo), Identified(PeerId, IdentifyInfo),
// TODO: Pub-sub testing only. /// Received pubsub message.
Message(String), PubsubMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: PubsubMessage,
},
} }

View File

@ -7,7 +7,7 @@ use beacon_chain::{
types::{BeaconState, ChainSpec}, types::{BeaconState, ChainSpec},
AggregationOutcome, CheckPoint, AggregationOutcome, CheckPoint,
}; };
use eth2_libp2p::HelloMessage; use eth2_libp2p::rpc::HelloMessage;
use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};

View File

@ -4,7 +4,7 @@ use crate::service::{NetworkMessage, OutgoingMessage};
use crate::sync::SimpleSync; use crate::sync::SimpleSync;
use crossbeam_channel::{unbounded as channel, Sender}; use crossbeam_channel::{unbounded as channel, Sender};
use eth2_libp2p::{ use eth2_libp2p::{
behaviour::IncomingGossip, behaviour::PubsubMessage,
rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId}, rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId},
PeerId, RPCEvent, PeerId, RPCEvent,
}; };
@ -41,7 +41,7 @@ pub enum HandlerMessage {
/// An RPC response/request has been received. /// An RPC response/request has been received.
RPC(PeerId, RPCEvent), RPC(PeerId, RPCEvent),
/// A gossip message has been received. /// A gossip message has been received.
IncomingGossip(PeerId, IncomingGossip), PubsubMessage(PeerId, PubsubMessage),
} }
impl MessageHandler { impl MessageHandler {
@ -92,7 +92,7 @@ impl MessageHandler {
self.handle_rpc_message(peer_id, rpc_event); self.handle_rpc_message(peer_id, rpc_event);
} }
// we have received an RPC message request/response // we have received an RPC message request/response
HandlerMessage::IncomingGossip(peer_id, gossip) => { HandlerMessage::PubsubMessage(peer_id, gossip) => {
self.handle_gossip(peer_id, gossip); self.handle_gossip(peer_id, gossip);
} }
//TODO: Handle all messages //TODO: Handle all messages
@ -205,13 +205,13 @@ impl MessageHandler {
} }
/// Handle RPC messages /// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: IncomingGossip) { fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) {
match gossip_message { match gossip_message {
IncomingGossip::Block(message) => { PubsubMessage::Block(message) => {
self.sync self.sync
.on_block_gossip(peer_id, message, &mut self.network_context) .on_block_gossip(peer_id, message, &mut self.network_context)
} }
IncomingGossip::Attestation(message) => { PubsubMessage::Attestation(message) => {
self.sync self.sync
.on_attestation_gossip(peer_id, message, &mut self.network_context) .on_attestation_gossip(peer_id, message, &mut self.network_context)
} }

View File

@ -3,16 +3,16 @@ use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::NetworkConfig; use crate::NetworkConfig;
use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
use eth2_libp2p::{RPCEvent, PublishMessage};
use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{Libp2pEvent, PeerId}; use eth2_libp2p::{Libp2pEvent, PeerId};
use eth2_libp2p::{PubsubMessage, RPCEvent};
use futures::prelude::*; use futures::prelude::*;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::Stream; use futures::Stream;
use slog::{debug, info, o, trace}; use slog::{debug, info, o, trace};
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
use types::{BeaconBlock, Topic}; use types::Topic;
/// Service that handles communication between internal services and the eth2_libp2p network service. /// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service { pub struct Service {
@ -100,6 +100,7 @@ fn spawn_service(
Ok(network_exit) Ok(network_exit)
} }
//TODO: Potentially handle channel errors
fn network_service( fn network_service(
mut libp2p_service: LibP2PService, mut libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>, network_recv: crossbeam_channel::Receiver<NetworkMessage>,
@ -129,10 +130,17 @@ fn network_service(
"We have identified peer: {:?} with {:?}", peer_id, info "We have identified peer: {:?} with {:?}", peer_id, info
); );
} }
Libp2pEvent::Message(m) => debug!( Libp2pEvent::PubsubMessage {
libp2p_service.log, source,
"Network Service: Message received: {}", m topics: _,
), message,
} => {
//TODO: Decide if we need to propagate the topic upwards. (Potentially for
//attestations)
message_handler_send
.send(HandlerMessage::PubsubMessage(source, message))
.map_err(|_| " failed to send pubsub message to handler")?;
}
}, },
Ok(Async::Ready(None)) => unreachable!("Stream never ends"), Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
Ok(Async::NotReady) => break, Ok(Async::NotReady) => break,
@ -157,17 +165,16 @@ fn network_service(
} }
}; };
} }
Ok(NetworkMessage::Publish(topic, message)) => {
debug!(log, "Sending pubsub message on topic {:?}", topic);
libp2p_service.swarm.publish(topic, message);
}
Err(TryRecvError::Empty) => break, Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => { Err(TryRecvError::Disconnected) => {
return Err(eth2_libp2p::error::Error::from( return Err(eth2_libp2p::error::Error::from(
"Network channel disconnected", "Network channel disconnected",
)); ));
}, }
Ok(NetworkMessage::Publish(topic, message) => {
debug!(log, "Sending message on topic {:?}", topic);
libp2p_service.swarm.publish(topic,message)
} }
} }
Ok(Async::NotReady) Ok(Async::NotReady)
@ -180,8 +187,8 @@ pub enum NetworkMessage {
/// Send a message to libp2p service. /// Send a message to libp2p service.
//TODO: Define typing for messages across the wire //TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage), Send(PeerId, OutgoingMessage),
/// Publish a message to gossipsub /// Publish a message to pubsub mechanism.
Publish(Topic, PublishMessage), Publish(Topic, PubsubMessage),
} }
/// Type of outgoing messages that can be sent through the network service. /// Type of outgoing messages that can be sent through the network service.
@ -192,4 +199,3 @@ pub enum OutgoingMessage {
//TODO: Remove //TODO: Remove
NotifierTest, NotifierTest,
} }

View File

@ -7,6 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
bls = { path = "../../eth2/utils/bls" } bls = { path = "../../eth2/utils/bls" }
beacon_chain = { path = "../beacon_chain" } beacon_chain = { path = "../beacon_chain" }
network = { path = "../network" }
version = { path = "../version" } version = { path = "../version" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" } ssz = { path = "../../eth2/utils/ssz" }
@ -23,3 +24,4 @@ slog-term = "^2.4.0"
slog-async = "^2.3.0" slog-async = "^2.3.0"
tokio = "0.1.17" tokio = "0.1.17"
exit-future = "0.1.4" exit-future = "0.1.4"
crossbeam-channel = "0.3.8"

View File

@ -6,6 +6,8 @@ use protos::services::{
}; };
use protos::services_grpc::BeaconBlockService; use protos::services_grpc::BeaconBlockService;
use slog::Logger; use slog::Logger;
use crossbeam_channel;
use network::NetworkMessage;
#[derive(Clone)] #[derive(Clone)]
pub struct BeaconBlockServiceInstance { pub struct BeaconBlockServiceInstance {
@ -48,8 +50,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
println!("publishing {:?}", block); println!("publishing {:?}", block);
// TODO: Build properly // TODO: Obtain from the network properly.
let topic = types::TopicBuilder:: let topic = types::TopicBuilder::from("beacon_chain").build();
println!("Sending beacon block to gossipsub"); println!("Sending beacon block to gossipsub");
network_chan.send(NetworkMessage::Publish( network_chan.send(NetworkMessage::Publish(

View File

@ -85,6 +85,6 @@ pub type AttesterMap = HashMap<(u64, u64), Vec<usize>>;
pub type ProposerMap = HashMap<u64, usize>; pub type ProposerMap = HashMap<u64, usize>;
pub use bls::{AggregatePublicKey, AggregateSignature, Keypair, PublicKey, SecretKey, Signature}; pub use bls::{AggregatePublicKey, AggregateSignature, Keypair, PublicKey, SecretKey, Signature};
pub use libp2p::floodsub::{Topic, TopicBuilder}; pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash};
pub use libp2p::multiaddr; pub use libp2p::multiaddr;
pub use libp2p::Multiaddr; pub use libp2p::Multiaddr;