diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 44eab4fe2..b24d2cb7f 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -46,7 +46,7 @@ impl Client { // TODO: Add beacon_chain reference to network parameters let network_config = &config.net_conf; let network_logger = log.new(o!("Service" => "Network")); - let (network, _network_send) = NetworkService::new( + let (network, network_send) = NetworkService::new( beacon_chain.clone(), network_config, executor, @@ -59,6 +59,7 @@ impl Client { rpc_exit_signal = Some(rpc::start_server( &config.rpc_conf, executor, + network_send, beacon_chain.clone(), &log, )); diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 3d5b94353..b3c4213b1 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -14,6 +14,7 @@ use libp2p::{ NetworkBehaviour, PeerId, }; use slog::{debug, o}; +use ssz::{ssz_encode, Decodable, DecodeError, Encodable, SszStream}; use ssz_derive::{Decode, Encode}; use types::Attestation; use types::Topic; @@ -124,6 +125,15 @@ impl Behaviour { } } + /* Behaviour functions */ + + /// Publishes a message on the pubsub (gossipsub) behaviour. + pub fn publish(&mut self, topic: Topic, message: PubsubMessage) { + //encode the message + let message_bytes = ssz_encode(&message); + self.gossipsub.publish(topic, message_bytes); + } + /// Consumes the events list when polled. fn poll( &mut self, @@ -158,12 +168,6 @@ pub enum BehaviourEvent { Message(String), } -#[derive(Debug, Clone)] -pub enum IncomingGossip { - Block(BlockGossip), - Attestation(AttestationGossip), -} - /// Gossipsub message providing notification of a new block. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlockGossip { @@ -175,3 +179,43 @@ pub struct BlockGossip { pub struct AttestationGossip { pub attestation: Attestation, } + +/// Messages that are passed to and from the pubsub (Gossipsub) behaviour. +#[derive(Debug, Clone)] +pub enum PubsubMessage { + Block(BlockGossip), + Attestation(AttestationGossip), +} + +//TODO: Correctly encode/decode enums. Prefixing with integer for now. +impl Encodable for PubsubMessage { + fn ssz_append(&self, s: &mut SszStream) { + match self { + PubsubMessage::Block(block_gossip) => { + 0u32.ssz_append(s); + block_gossip.ssz_append(s); + } + PubsubMessage::Attestation(attestation_gossip) => { + 1u32.ssz_append(s); + attestation_gossip.ssz_append(s); + } + } + } +} + +impl Decodable for PubsubMessage { + fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), DecodeError> { + let (id, index) = u32::ssz_decode(bytes, index)?; + match id { + 1 => { + let (block, index) = BlockGossip::ssz_decode(bytes, index)?; + Ok((PubsubMessage::Block(block), index)) + } + 2 => { + let (attestation, index) = AttestationGossip::ssz_decode(bytes, index)?; + Ok((PubsubMessage::Attestation(attestation), index)) + } + _ => Err(DecodeError::Invalid), + } + } +} diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index 26cea0065..c627912a4 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -109,7 +109,7 @@ where let state = self.get_state(); HelloMessage { - network_id: spec.network_id, + network_id: spec.chain_id, latest_finalized_root: state.finalized_root, latest_finalized_epoch: state.finalized_epoch, best_root: self.best_block_root(), diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index a3eb6f0d9..33ea79c1a 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -3,7 +3,7 @@ use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::NetworkConfig; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; -use eth2_libp2p::RPCEvent; +use eth2_libp2p::{RPCEvent, PublishMessage}; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::{Libp2pEvent, PeerId}; use futures::prelude::*; @@ -12,6 +12,7 @@ use futures::Stream; use slog::{debug, info, o, trace}; use std::sync::Arc; use tokio::runtime::TaskExecutor; +use types::{BeaconBlock, Topic}; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { @@ -161,7 +162,12 @@ fn network_service( return Err(eth2_libp2p::error::Error::from( "Network channel disconnected", )); - } + }, + Ok(NetworkMessage::Publish(topic, message) => { + debug!(log, "Sending message on topic {:?}", topic); + libp2p_service.swarm.publish(topic,message) + + } } Ok(Async::NotReady) @@ -174,6 +180,8 @@ pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), + /// Publish a message to gossipsub + Publish(Topic, PublishMessage), } /// Type of outgoing messages that can be sent through the network service. @@ -184,3 +192,4 @@ pub enum OutgoingMessage { //TODO: Remove NotifierTest, } + diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index 96f64e0dd..9169d695d 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -9,6 +9,7 @@ use slog::Logger; #[derive(Clone)] pub struct BeaconBlockServiceInstance { + network_chan: crossbeam_channel::Sender, pub log: Logger, } @@ -43,7 +44,15 @@ impl BeaconBlockService for BeaconBlockServiceInstance { req: PublishBeaconBlockRequest, sink: UnarySink, ) { - println!("publishing {:?}", req.get_block()); + let block = req.get_block(); + println!("publishing {:?}", block); + + + // TODO: Build properly + let topic = types::TopicBuilder:: + println!("Sending beacon block to gossipsub"); + network_chan.send(NetworkMessage::Publish( + // TODO: actually process the block. let mut resp = PublishBeaconBlockResponse::new(); diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 3c89bda1f..e1267270c 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -21,6 +21,7 @@ use tokio::runtime::TaskExecutor; pub fn start_server( config: &RPCConfig, executor: &TaskExecutor, + network_chan: crossbeam_channel::Sender, beacon_chain: Arc, log: &slog::Logger, ) -> exit_future::Signal { @@ -40,7 +41,9 @@ pub fn start_server( }; let beacon_block_service = { - let instance = BeaconBlockServiceInstance { log: log.clone() }; + let instance = BeaconBlockServiceInstance { + network_chan + log: log.clone() }; create_beacon_block_service(instance) }; let validator_service = {