Fix compile issues and modify type names

This commit is contained in:
Age Manning 2019-03-25 23:39:39 +11:00
parent 52b31b2009
commit f7131c2f87
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
7 changed files with 51 additions and 43 deletions

View File

@ -170,9 +170,11 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
}
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topic: Topic, message: PubsubMessage) {
pub fn publish(&mut self, topics: Vec<Topic>, message: PubsubMessage) {
let message_bytes = ssz_encode(&message);
self.gossipsub.publish(topic, message_bytes);
for topic in topics {
self.gossipsub.publish(topic, message_bytes.clone());
}
}
}
@ -189,23 +191,13 @@ pub enum BehaviourEvent {
},
}
/// Gossipsub message providing notification of a new block.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlockGossip {
pub root: BlockRootSlot,
}
/// Gossipsub message providing notification of a new attestation.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct AttestationGossip {
pub attestation: Attestation,
}
/// Messages that are passed to and from the pubsub (Gossipsub) behaviour.
#[derive(Debug, Clone)]
pub enum PubsubMessage {
Block(BlockGossip),
Attestation(AttestationGossip),
/// Gossipsub message providing notification of a new block.
Block(BlockRootSlot),
/// Gossipsub message providing notification of a new attestation.
Attestation(Attestation),
}
//TODO: Correctly encode/decode enums. Prefixing with integer for now.
@ -229,11 +221,11 @@ impl Decodable for PubsubMessage {
let (id, index) = u32::ssz_decode(bytes, index)?;
match id {
1 => {
let (block, index) = BlockGossip::ssz_decode(bytes, index)?;
let (block, index) = BlockRootSlot::ssz_decode(bytes, index)?;
Ok((PubsubMessage::Block(block), index))
}
2 => {
let (attestation, index) = AttestationGossip::ssz_decode(bytes, index)?;
let (attestation, index) = Attestation::ssz_decode(bytes, index)?;
Ok((PubsubMessage::Attestation(attestation), index))
}
_ => Err(DecodeError::Invalid),

View File

@ -6,4 +6,5 @@ pub mod service;
pub mod sync;
pub use eth2_libp2p::NetworkConfig;
pub use service::NetworkMessage;
pub use service::Service;

View File

@ -165,9 +165,9 @@ fn network_service(
}
};
}
Ok(NetworkMessage::Publish(topic, message)) => {
debug!(log, "Sending pubsub message on topic {:?}", topic);
libp2p_service.swarm.publish(topic, message);
Ok(NetworkMessage::Publish { topics, message }) => {
debug!(log, "Sending pubsub message on topics {:?}", topics);
libp2p_service.swarm.publish(topics, message);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
@ -188,7 +188,10 @@ pub enum NetworkMessage {
//TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage),
/// Publish a message to pubsub mechanism.
Publish(Topic, PubsubMessage),
Publish {
topics: Vec<Topic>,
message: PubsubMessage,
},
}
/// Type of outgoing messages that can be sent through the network service.

View File

@ -1,7 +1,6 @@
use super::import_queue::ImportQueue;
use crate::beacon_chain::BeaconChain;
use crate::message_handler::NetworkContext;
use eth2_libp2p::behaviour::{AttestationGossip, BlockGossip};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
@ -9,7 +8,7 @@ use slog::{debug, error, info, o, warn};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use types::{Epoch, Hash256, Slot};
use types::{Attestation, Epoch, Hash256, Slot};
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
const SLOT_IMPORT_TOLERANCE: u64 = 100;
@ -521,12 +520,12 @@ impl SimpleSync {
pub fn on_block_gossip(
&mut self,
peer_id: PeerId,
msg: BlockGossip,
msg: BlockRootSlot,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockGossip";
"BlockSlot";
"peer" => format!("{:?}", peer_id),
);
// TODO: filter out messages that a prior to the finalized slot.
@ -535,12 +534,12 @@ impl SimpleSync {
// now.
//
// Note: only requests the new block -- will fail if we don't have its parents.
if self.import_queue.is_new_block(&msg.root.block_root) {
if self.import_queue.is_new_block(&msg.block_root) {
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: msg.root.block_root,
start_slot: msg.root.slot,
start_root: msg.block_root,
start_slot: msg.slot,
max_headers: 1,
skip_slots: 0,
},
@ -555,19 +554,19 @@ impl SimpleSync {
pub fn on_attestation_gossip(
&mut self,
peer_id: PeerId,
msg: AttestationGossip,
msg: Attestation,
_network: &mut NetworkContext,
) {
debug!(
self.log,
"AttestationGossip";
"Attestation";
"peer" => format!("{:?}", peer_id),
);
// Awaiting a proper operations pool before we can import attestations.
//
// https://github.com/sigp/lighthouse/issues/281
match self.chain.process_attestation(msg.attestation) {
match self.chain.process_attestation(msg) {
Ok(_) => panic!("Impossible, method not implemented."),
Err(_) => error!(self.log, "Attestation processing not implemented!"),
}

View File

@ -8,6 +8,7 @@ edition = "2018"
bls = { path = "../../eth2/utils/bls" }
beacon_chain = { path = "../beacon_chain" }
network = { path = "../network" }
eth2-libp2p = { path = "../eth2-libp2p" }
version = { path = "../version" }
types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" }

View File

@ -1,17 +1,20 @@
use crossbeam_channel;
use eth2_libp2p::rpc::methods::BlockRootSlot;
use eth2_libp2p::PubsubMessage;
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use network::NetworkMessage;
use protos::services::{
BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse,
PublishBeaconBlockRequest, PublishBeaconBlockResponse,
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
use crossbeam_channel;
use network::NetworkMessage;
use types::{Hash256, Slot};
#[derive(Clone)]
pub struct BeaconBlockServiceInstance {
network_chan: crossbeam_channel::Sender<NetworkMessage>,
pub network_chan: crossbeam_channel::Sender<NetworkMessage>,
pub log: Logger,
}
@ -47,14 +50,21 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
sink: UnarySink<PublishBeaconBlockResponse>,
) {
let block = req.get_block();
println!("publishing {:?}", block);
let block_root = Hash256::from_slice(block.get_block_root());
let block_slot = BlockRootSlot {
block_root,
slot: Slot::from(block.get_slot()),
};
println!("publishing block with root {:?}", block_root);
// TODO: Obtain from the network properly.
let topic = types::TopicBuilder::from("beacon_chain").build();
// TODO: Obtain topics from the network service properly.
let topic = types::TopicBuilder::new("beacon_chain".to_string()).build();
let message = PubsubMessage::Block(block_slot);
println!("Sending beacon block to gossipsub");
network_chan.send(NetworkMessage::Publish(
self.network_chan.send(NetworkMessage::Publish {
topics: vec![topic],
message,
});
// TODO: actually process the block.
let mut resp = PublishBeaconBlockResponse::new();

View File

@ -11,6 +11,7 @@ use self::validator::ValidatorServiceInstance;
pub use config::Config as RPCConfig;
use futures::{future, Future};
use grpcio::{Environment, Server, ServerBuilder};
use network::NetworkMessage;
use protos::services_grpc::{
create_beacon_block_service, create_beacon_node_service, create_validator_service,
};
@ -42,8 +43,9 @@ pub fn start_server(
let beacon_block_service = {
let instance = BeaconBlockServiceInstance {
network_chan
log: log.clone() };
network_chan,
log: log.clone(),
};
create_beacon_block_service(instance)
};
let validator_service = {