Add PubsubMessage and publish function to behaviour

This commit is contained in:
Age Manning 2019-03-25 22:00:11 +11:00
parent 214fb5b8ff
commit 05369df7e8
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
6 changed files with 78 additions and 12 deletions

View File

@ -46,7 +46,7 @@ impl<TClientType: ClientTypes> Client<TClientType> {
// TODO: Add beacon_chain reference to network parameters // TODO: Add beacon_chain reference to network parameters
let network_config = &config.net_conf; let network_config = &config.net_conf;
let network_logger = log.new(o!("Service" => "Network")); let network_logger = log.new(o!("Service" => "Network"));
let (network, _network_send) = NetworkService::new( let (network, network_send) = NetworkService::new(
beacon_chain.clone(), beacon_chain.clone(),
network_config, network_config,
executor, executor,
@ -59,6 +59,7 @@ impl<TClientType: ClientTypes> Client<TClientType> {
rpc_exit_signal = Some(rpc::start_server( rpc_exit_signal = Some(rpc::start_server(
&config.rpc_conf, &config.rpc_conf,
executor, executor,
network_send,
beacon_chain.clone(), beacon_chain.clone(),
&log, &log,
)); ));

View File

@ -14,6 +14,7 @@ use libp2p::{
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use slog::{debug, o}; use slog::{debug, o};
use ssz::{ssz_encode, Decodable, DecodeError, Encodable, SszStream};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use types::Attestation; use types::Attestation;
use types::Topic; use types::Topic;
@ -124,6 +125,15 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
} }
} }
/* Behaviour functions */
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topic: Topic, message: PubsubMessage) {
//encode the message
let message_bytes = ssz_encode(&message);
self.gossipsub.publish(topic, message_bytes);
}
/// Consumes the events list when polled. /// Consumes the events list when polled.
fn poll<TBehaviourIn>( fn poll<TBehaviourIn>(
&mut self, &mut self,
@ -158,12 +168,6 @@ pub enum BehaviourEvent {
Message(String), Message(String),
} }
#[derive(Debug, Clone)]
pub enum IncomingGossip {
Block(BlockGossip),
Attestation(AttestationGossip),
}
/// Gossipsub message providing notification of a new block. /// Gossipsub message providing notification of a new block.
#[derive(Encode, Decode, Clone, Debug, PartialEq)] #[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlockGossip { pub struct BlockGossip {
@ -175,3 +179,43 @@ pub struct BlockGossip {
pub struct AttestationGossip { pub struct AttestationGossip {
pub attestation: Attestation, pub attestation: Attestation,
} }
/// Messages that are passed to and from the pubsub (Gossipsub) behaviour.
#[derive(Debug, Clone)]
pub enum PubsubMessage {
Block(BlockGossip),
Attestation(AttestationGossip),
}
//TODO: Correctly encode/decode enums. Prefixing with integer for now.
impl Encodable for PubsubMessage {
fn ssz_append(&self, s: &mut SszStream) {
match self {
PubsubMessage::Block(block_gossip) => {
0u32.ssz_append(s);
block_gossip.ssz_append(s);
}
PubsubMessage::Attestation(attestation_gossip) => {
1u32.ssz_append(s);
attestation_gossip.ssz_append(s);
}
}
}
}
impl Decodable for PubsubMessage {
fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), DecodeError> {
let (id, index) = u32::ssz_decode(bytes, index)?;
match id {
1 => {
let (block, index) = BlockGossip::ssz_decode(bytes, index)?;
Ok((PubsubMessage::Block(block), index))
}
2 => {
let (attestation, index) = AttestationGossip::ssz_decode(bytes, index)?;
Ok((PubsubMessage::Attestation(attestation), index))
}
_ => Err(DecodeError::Invalid),
}
}
}

View File

@ -109,7 +109,7 @@ where
let state = self.get_state(); let state = self.get_state();
HelloMessage { HelloMessage {
network_id: spec.network_id, network_id: spec.chain_id,
latest_finalized_root: state.finalized_root, latest_finalized_root: state.finalized_root,
latest_finalized_epoch: state.finalized_epoch, latest_finalized_epoch: state.finalized_epoch,
best_root: self.best_block_root(), best_root: self.best_block_root(),

View File

@ -3,7 +3,7 @@ use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::NetworkConfig; use crate::NetworkConfig;
use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
use eth2_libp2p::RPCEvent; use eth2_libp2p::{RPCEvent, PublishMessage};
use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{Libp2pEvent, PeerId}; use eth2_libp2p::{Libp2pEvent, PeerId};
use futures::prelude::*; use futures::prelude::*;
@ -12,6 +12,7 @@ use futures::Stream;
use slog::{debug, info, o, trace}; use slog::{debug, info, o, trace};
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
use types::{BeaconBlock, Topic};
/// Service that handles communication between internal services and the eth2_libp2p network service. /// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service { pub struct Service {
@ -161,7 +162,12 @@ fn network_service(
return Err(eth2_libp2p::error::Error::from( return Err(eth2_libp2p::error::Error::from(
"Network channel disconnected", "Network channel disconnected",
)); ));
} },
Ok(NetworkMessage::Publish(topic, message) => {
debug!(log, "Sending message on topic {:?}", topic);
libp2p_service.swarm.publish(topic,message)
} }
} }
Ok(Async::NotReady) Ok(Async::NotReady)
@ -174,6 +180,8 @@ pub enum NetworkMessage {
/// Send a message to libp2p service. /// Send a message to libp2p service.
//TODO: Define typing for messages across the wire //TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage), Send(PeerId, OutgoingMessage),
/// Publish a message to gossipsub
Publish(Topic, PublishMessage),
} }
/// Type of outgoing messages that can be sent through the network service. /// Type of outgoing messages that can be sent through the network service.
@ -184,3 +192,4 @@ pub enum OutgoingMessage {
//TODO: Remove //TODO: Remove
NotifierTest, NotifierTest,
} }

View File

@ -9,6 +9,7 @@ use slog::Logger;
#[derive(Clone)] #[derive(Clone)]
pub struct BeaconBlockServiceInstance { pub struct BeaconBlockServiceInstance {
network_chan: crossbeam_channel::Sender<NetworkMessage>,
pub log: Logger, pub log: Logger,
} }
@ -43,7 +44,15 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
req: PublishBeaconBlockRequest, req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>, sink: UnarySink<PublishBeaconBlockResponse>,
) { ) {
println!("publishing {:?}", req.get_block()); let block = req.get_block();
println!("publishing {:?}", block);
// TODO: Build properly
let topic = types::TopicBuilder::
println!("Sending beacon block to gossipsub");
network_chan.send(NetworkMessage::Publish(
// TODO: actually process the block. // TODO: actually process the block.
let mut resp = PublishBeaconBlockResponse::new(); let mut resp = PublishBeaconBlockResponse::new();

View File

@ -21,6 +21,7 @@ use tokio::runtime::TaskExecutor;
pub fn start_server( pub fn start_server(
config: &RPCConfig, config: &RPCConfig,
executor: &TaskExecutor, executor: &TaskExecutor,
network_chan: crossbeam_channel::Sender<NetworkMessage>,
beacon_chain: Arc<BeaconChain>, beacon_chain: Arc<BeaconChain>,
log: &slog::Logger, log: &slog::Logger,
) -> exit_future::Signal { ) -> exit_future::Signal {
@ -40,7 +41,9 @@ pub fn start_server(
}; };
let beacon_block_service = { let beacon_block_service = {
let instance = BeaconBlockServiceInstance { log: log.clone() }; let instance = BeaconBlockServiceInstance {
network_chan
log: log.clone() };
create_beacon_block_service(instance) create_beacon_block_service(instance)
}; };
let validator_service = { let validator_service = {