V0.11.0 network update (#976)

* Adjust RPC methods to match v0.11.1

* Adds fork handling for gossipsub topics

* Update gossipsub topics to v0.11.0
This commit is contained in:
Age Manning 2020-04-01 17:20:32 +11:00
parent 5eb4c7d682
commit 88cecd6fb8
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
19 changed files with 247 additions and 226 deletions

View File

@ -1,8 +1,7 @@
use crate::discovery::Discovery; use crate::discovery::Discovery;
use crate::rpc::{RPCEvent, RPCMessage, RPC}; use crate::rpc::{RPCEvent, RPCMessage, RPC};
use crate::types::GossipEncoding; use crate::types::{GossipEncoding, GossipKind, GossipTopic};
use crate::Enr; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use crate::{error, GossipTopic, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*; use futures::prelude::*;
use libp2p::{ use libp2p::{
core::identity::Keypair, core::identity::Keypair,
@ -47,6 +46,9 @@ pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> {
#[behaviour(ignore)] #[behaviour(ignore)]
network_globals: Arc<NetworkGlobals<TSpec>>, network_globals: Arc<NetworkGlobals<TSpec>>,
#[behaviour(ignore)] #[behaviour(ignore)]
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
enr_fork_id: EnrForkId,
#[behaviour(ignore)]
/// Logger for behaviour actions. /// Logger for behaviour actions.
log: slog::Logger, log: slog::Logger,
} }
@ -74,7 +76,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
discovery: Discovery::new( discovery: Discovery::new(
local_key, local_key,
net_conf, net_conf,
enr_fork_id, enr_fork_id.clone(),
network_globals.clone(), network_globals.clone(),
log, log,
)?, )?,
@ -82,6 +84,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
events: Vec::new(), events: Vec::new(),
seen_gossip_messages: LruCache::new(100_000), seen_gossip_messages: LruCache::new(100_000),
network_globals, network_globals,
enr_fork_id,
log: behaviour_log, log: behaviour_log,
}) })
} }
@ -99,25 +102,57 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> { impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> {
/* Pubsub behaviour functions */ /* Pubsub behaviour functions */
/// Subscribes to a gossipsub topic kind, letting the network service determine the
/// encoding and fork version.
pub fn subscribe_kind(&mut self, kind: GossipKind) -> bool {
let gossip_topic =
GossipTopic::new(kind, GossipEncoding::SSZ, self.enr_fork_id.fork_digest);
self.subscribe(gossip_topic)
}
/// Unsubscribes from a gossipsub topic kind, letting the network service determine the
/// encoding and fork version.
pub fn unsubscribe_kind(&mut self, kind: GossipKind) -> bool {
let gossip_topic =
GossipTopic::new(kind, GossipEncoding::SSZ, self.enr_fork_id.fork_digest);
self.unsubscribe(gossip_topic)
}
/// Subscribes to a specific subnet id;
pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) -> bool {
let topic = GossipTopic::new(
subnet_id.into(),
GossipEncoding::SSZ,
self.enr_fork_id.fork_digest,
);
self.subscribe(topic)
}
/// Un-Subscribes from a specific subnet id;
pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) -> bool {
let topic = GossipTopic::new(
subnet_id.into(),
GossipEncoding::SSZ,
self.enr_fork_id.fork_digest,
);
self.unsubscribe(topic)
}
/// Subscribes to a gossipsub topic. /// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: GossipTopic) -> bool { fn subscribe(&mut self, topic: GossipTopic) -> bool {
// update the network globals // update the network globals
self.network_globals self.network_globals
.gossipsub_subscriptions .gossipsub_subscriptions
.write() .write()
.insert(topic.clone()); .insert(topic.clone());
// subscribe to the topic
let topic_str: String = topic.clone().into();
debug!(self.log, "Subscribed to topic"; "topic" => topic_str);
self.gossipsub.subscribe(topic.into()) self.gossipsub.subscribe(topic.into())
} }
/// Subscribes to a specific subnet id;
pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) {
let topic = GossipTopic::new(subnet_id.into(), GossipEncoding::SSZ);
self.subscribe(topic);
}
/// Unsubscribe from a gossipsub topic. /// Unsubscribe from a gossipsub topic.
pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool { fn unsubscribe(&mut self, topic: GossipTopic) -> bool {
// update the network globals // update the network globals
self.network_globals self.network_globals
.gossipsub_subscriptions .gossipsub_subscriptions
@ -127,17 +162,11 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
self.gossipsub.unsubscribe(topic.into()) self.gossipsub.unsubscribe(topic.into())
} }
/// Un-Subscribes from a specific subnet id;
pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) {
let topic = GossipTopic::new(subnet_id.into(), GossipEncoding::SSZ);
self.unsubscribe(topic);
}
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding. /// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) { pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) {
for message in messages { for message in messages {
for topic in message.topics() { for topic in message.topics(GossipEncoding::SSZ, self.enr_fork_id.fork_digest) {
let message_data = message.encode(); let message_data = message.encode(GossipEncoding::SSZ);
self.gossipsub.publish(&topic.into(), message_data); self.gossipsub.publish(&topic.into(), message_data);
} }
} }
@ -195,8 +224,30 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
/// Updates the local ENR's "eth2" field with the latest EnrForkId. /// Updates the local ENR's "eth2" field with the latest EnrForkId.
pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) { pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) {
self.discovery.update_eth2_enr(enr_fork_id); self.discovery.update_eth2_enr(enr_fork_id.clone());
// TODO: Handle gossipsub fork update
// unsubscribe from all gossip topics and re-subscribe to their new fork counterparts
let subscribed_topics = self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.cloned()
.collect::<Vec<GossipTopic>>();
// unsubscribe from all topics
for topic in &subscribed_topics {
self.unsubscribe(topic.clone());
}
// re-subscribe modifying the fork version
for mut topic in subscribed_topics {
*topic.digest() = enr_fork_id.fork_digest;
self.subscribe(topic);
}
// update the local reference
self.enr_fork_id = enr_fork_id;
} }
} }

View File

@ -1,4 +1,4 @@
use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::types::GossipKind;
use crate::Enr; use crate::Enr;
use libp2p::discv5::{Discv5Config, Discv5ConfigBuilder}; use libp2p::discv5::{Discv5Config, Discv5ConfigBuilder};
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId}; use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId};
@ -61,7 +61,7 @@ pub struct Config {
pub client_version: String, pub client_version: String,
/// List of extra topics to initially subscribe to as strings. /// List of extra topics to initially subscribe to as strings.
pub topics: Vec<GossipTopic>, pub topics: Vec<GossipKind>,
/// Introduces randomization in network propagation of messages. This should only be set for /// Introduces randomization in network propagation of messages. This should only be set for
/// testing purposes and will likely be removed in future versions. /// testing purposes and will likely be removed in future versions.
@ -78,11 +78,11 @@ impl Default for Config {
// The default topics that we will initially subscribe to // The default topics that we will initially subscribe to
let topics = vec![ let topics = vec![
GossipTopic::new(GossipKind::BeaconBlock, GossipEncoding::SSZ), GossipKind::BeaconBlock,
GossipTopic::new(GossipKind::BeaconAggregateAndProof, GossipEncoding::SSZ), GossipKind::BeaconAggregateAndProof,
GossipTopic::new(GossipKind::VoluntaryExit, GossipEncoding::SSZ), GossipKind::VoluntaryExit,
GossipTopic::new(GossipKind::ProposerSlashing, GossipEncoding::SSZ), GossipKind::ProposerSlashing,
GossipTopic::new(GossipKind::AttesterSlashing, GossipEncoding::SSZ), GossipKind::AttesterSlashing,
]; ];
// The function used to generate a gossipsub message id // The function used to generate a gossipsub message id

View File

@ -16,7 +16,7 @@ pub mod types;
// shift this type into discv5 // shift this type into discv5
pub type Enr = libp2p::discv5::enr::Enr<libp2p::discv5::enr::CombinedKey>; pub type Enr = libp2p::discv5::enr::Enr<libp2p::discv5::enr::CombinedKey>;
pub use crate::types::{error, GossipTopic, NetworkGlobals, PeerInfo, PubsubData, PubsubMessage}; pub use crate::types::{error, GossipTopic, NetworkGlobals, PeerInfo, PubsubMessage};
pub use config::Config as NetworkConfig; pub use config::Config as NetworkConfig;
pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash};
pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{multiaddr, Multiaddr};

View File

@ -13,7 +13,7 @@ pub type RequestId = usize;
#[derive(Encode, Decode, Clone, Debug, PartialEq)] #[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct StatusMessage { pub struct StatusMessage {
/// The fork version of the chain we are broadcasting. /// The fork version of the chain we are broadcasting.
pub fork_version: [u8; 4], pub fork_digest: [u8; 4],
/// Latest finalized root. /// Latest finalized root.
pub finalized_root: Hash256, pub finalized_root: Hash256,
@ -101,9 +101,6 @@ impl ssz::Decode for GoodbyeReason {
/// Request a number of beacon block roots from a peer. /// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)] #[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlocksByRangeRequest { pub struct BlocksByRangeRequest {
/// The hash tree root of a block on the requested chain.
pub head_block_root: Hash256,
/// The starting slot to request blocks. /// The starting slot to request blocks.
pub start_slot: u64, pub start_slot: u64,
@ -238,7 +235,7 @@ impl ErrorMessage {
impl std::fmt::Display for StatusMessage { impl std::fmt::Display for StatusMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Status Message: Fork Version: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_version, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) write!(f, "Status Message: Fork Digest: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_digest, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot)
} }
} }
@ -283,8 +280,8 @@ impl std::fmt::Display for BlocksByRangeRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!( write!(
f, f,
"Head Block Root: {}, Start Slot: {}, Count: {}, Step: {}", "Start Slot: {}, Count: {}, Step: {}",
self.head_block_root, self.start_slot, self.count, self.step self.start_slot, self.count, self.step
) )
} }
} }

View File

@ -1,9 +1,8 @@
use crate::behaviour::{Behaviour, BehaviourEvent}; use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::multiaddr::Protocol; use crate::multiaddr::Protocol;
use crate::rpc::RPCEvent; use crate::rpc::RPCEvent;
use crate::types::error; use crate::types::{error, GossipKind};
use crate::NetworkConfig; use crate::{NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use crate::{NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*; use futures::prelude::*;
use futures::Stream; use futures::Stream;
use libp2p::core::{ use libp2p::core::{
@ -144,18 +143,12 @@ impl<TSpec: EthSpec> Service<TSpec> {
} }
} }
let mut subscribed_topics: Vec<String> = vec![]; let mut subscribed_topics: Vec<GossipKind> = vec![];
for topic in &config.topics { for topic_kind in &config.topics {
let topic_string: String = topic.clone().into(); if swarm.subscribe_kind(topic_kind.clone()) {
if swarm.subscribe(topic.clone()) { subscribed_topics.push(topic_kind.clone());
trace!(log, "Subscribed to topic"; "topic" => format!("{}", topic_string));
subscribed_topics.push(topic_string);
network_globals
.gossipsub_subscriptions
.write()
.insert(topic.clone());
} else { } else {
warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_string)); warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind));
} }
} }
info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics));

View File

@ -6,5 +6,5 @@ mod topics;
pub use globals::NetworkGlobals; pub use globals::NetworkGlobals;
pub use peer_info::{EnrBitfield, PeerInfo}; pub use peer_info::{EnrBitfield, PeerInfo};
pub use pubsub::{PubsubData, PubsubMessage}; pub use pubsub::PubsubMessage;
pub use topics::{GossipEncoding, GossipKind, GossipTopic}; pub use topics::{GossipEncoding, GossipKind, GossipTopic};

View File

@ -10,17 +10,8 @@ use types::{
SignedBeaconBlock, VoluntaryExit, SignedBeaconBlock, VoluntaryExit,
}; };
/// Messages that are passed to and from the pubsub (Gossipsub) behaviour.
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub struct PubsubMessage<T: EthSpec> { pub enum PubsubMessage<T: EthSpec> {
/// The encoding to be used to encode/decode the message
pub encoding: GossipEncoding,
/// The actual message being sent.
pub data: PubsubData<T>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PubsubData<T: EthSpec> {
/// Gossipsub message providing notification of a new block. /// Gossipsub message providing notification of a new block.
BeaconBlock(Box<SignedBeaconBlock<T>>), BeaconBlock(Box<SignedBeaconBlock<T>>),
/// Gossipsub message providing notification of a Aggregate attestation and associated proof. /// Gossipsub message providing notification of a Aggregate attestation and associated proof.
@ -36,36 +27,30 @@ pub enum PubsubData<T: EthSpec> {
} }
impl<T: EthSpec> PubsubMessage<T> { impl<T: EthSpec> PubsubMessage<T> {
pub fn new(encoding: GossipEncoding, data: PubsubData<T>) -> Self { /// Returns the topics that each pubsub message will be sent across, given a supported
PubsubMessage { encoding, data } /// gossipsub encoding and fork version.
pub fn topics(&self, encoding: GossipEncoding, fork_version: [u8; 4]) -> Vec<GossipTopic> {
vec![GossipTopic::new(self.kind(), encoding, fork_version)]
} }
/// Returns the topics that each pubsub message will be sent across, given a supported /// Returns the kind of gossipsub topic associated with the message.
/// gossipsub encoding. pub fn kind(&self) -> GossipKind {
pub fn topics(&self) -> Vec<GossipTopic> { match self {
let encoding = self.encoding.clone(); PubsubMessage::BeaconBlock(_) => GossipKind::BeaconBlock,
match &self.data { PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof,
PubsubData::BeaconBlock(_) => vec![GossipTopic::new(GossipKind::BeaconBlock, encoding)], PubsubMessage::Attestation(attestation_data) => {
PubsubData::AggregateAndProofAttestation(_) => vec![GossipTopic::new( GossipKind::CommitteeIndex(attestation_data.0)
GossipKind::BeaconAggregateAndProof,
encoding,
)],
PubsubData::Attestation(attestation_data) => vec![GossipTopic::new(
GossipKind::CommitteeIndex(attestation_data.0),
encoding,
)],
PubsubData::VoluntaryExit(_) => {
vec![GossipTopic::new(GossipKind::VoluntaryExit, encoding)]
}
PubsubData::ProposerSlashing(_) => {
vec![GossipTopic::new(GossipKind::ProposerSlashing, encoding)]
}
PubsubData::AttesterSlashing(_) => {
vec![GossipTopic::new(GossipKind::AttesterSlashing, encoding)]
} }
PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit,
PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing,
PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing,
} }
} }
/// This decodes `data` into a `PubsubMessage` given a list of topics.
///
/// The topics are checked
/// in order and as soon as one topic matches the decoded data, we return the data.
/* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will
* need to be modified. * need to be modified.
* *
@ -85,61 +70,48 @@ impl<T: EthSpec> PubsubMessage<T> {
// group each part by encoding type // group each part by encoding type
GossipEncoding::SSZ => { GossipEncoding::SSZ => {
// the ssz decoders // the ssz decoders
let encoding = GossipEncoding::SSZ;
match gossip_topic.kind() { match gossip_topic.kind() {
GossipKind::BeaconAggregateAndProof => { GossipKind::BeaconAggregateAndProof => {
let agg_and_proof = let agg_and_proof =
SignedAggregateAndProof::from_ssz_bytes(data) SignedAggregateAndProof::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::new( return Ok(PubsubMessage::AggregateAndProofAttestation(
encoding, Box::new(agg_and_proof),
PubsubData::AggregateAndProofAttestation(Box::new(
agg_and_proof,
)),
)); ));
} }
GossipKind::CommitteeIndex(subnet_id) => { GossipKind::CommitteeIndex(subnet_id) => {
let attestation = Attestation::from_ssz_bytes(data) let attestation = Attestation::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::new( return Ok(PubsubMessage::Attestation(Box::new((
encoding, *subnet_id,
PubsubData::Attestation(Box::new(( attestation,
*subnet_id, ))));
attestation,
))),
));
} }
GossipKind::BeaconBlock => { GossipKind::BeaconBlock => {
let beacon_block = SignedBeaconBlock::from_ssz_bytes(data) let beacon_block = SignedBeaconBlock::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::new( return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block)));
encoding,
PubsubData::BeaconBlock(Box::new(beacon_block)),
));
} }
GossipKind::VoluntaryExit => { GossipKind::VoluntaryExit => {
let voluntary_exit = VoluntaryExit::from_ssz_bytes(data) let voluntary_exit = VoluntaryExit::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::new( return Ok(PubsubMessage::VoluntaryExit(Box::new(
encoding, voluntary_exit,
PubsubData::VoluntaryExit(Box::new(voluntary_exit)), )));
));
} }
GossipKind::ProposerSlashing => { GossipKind::ProposerSlashing => {
let proposer_slashing = ProposerSlashing::from_ssz_bytes(data) let proposer_slashing = ProposerSlashing::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::new( return Ok(PubsubMessage::ProposerSlashing(Box::new(
encoding, proposer_slashing,
PubsubData::ProposerSlashing(Box::new(proposer_slashing)), )));
));
} }
GossipKind::AttesterSlashing => { GossipKind::AttesterSlashing => {
let attester_slashing = AttesterSlashing::from_ssz_bytes(data) let attester_slashing = AttesterSlashing::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::new( return Ok(PubsubMessage::AttesterSlashing(Box::new(
encoding, attester_slashing,
PubsubData::AttesterSlashing(Box::new(attester_slashing)), )));
));
} }
} }
} }
@ -150,19 +122,19 @@ impl<T: EthSpec> PubsubMessage<T> {
Err(format!("Unknown gossipsub topics: {:?}", unknown_topics)) Err(format!("Unknown gossipsub topics: {:?}", unknown_topics))
} }
/// Encodes a pubsub message based on the topic encodings. The first known encoding is used. If /// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If
/// no encoding is known, and error is returned. /// no encoding is known, and error is returned.
pub fn encode(&self) -> Vec<u8> { pub fn encode(&self, encoding: GossipEncoding) -> Vec<u8> {
match self.encoding { match encoding {
GossipEncoding::SSZ => { GossipEncoding::SSZ => {
// SSZ Encodings // SSZ Encodings
return match &self.data { return match &self {
PubsubData::BeaconBlock(data) => data.as_ssz_bytes(), PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(),
PubsubData::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(),
PubsubData::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(),
PubsubData::ProposerSlashing(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(),
PubsubData::AttesterSlashing(data) => data.as_ssz_bytes(), PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(),
PubsubData::Attestation(data) => data.1.as_ssz_bytes(), PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(),
}; };
} }
} }

View File

@ -23,11 +23,14 @@ pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing";
pub struct GossipTopic { pub struct GossipTopic {
/// The encoding of the topic. /// The encoding of the topic.
encoding: GossipEncoding, encoding: GossipEncoding,
/// The fork digest of the topic,
fork_digest: [u8; 4],
/// The kind of topic. /// The kind of topic.
kind: GossipKind, kind: GossipKind,
} }
/// Enum that brings these topics into the rust type system. /// Enum that brings these topics into the rust type system.
// NOTE: There is intentionally no unknown type here. We only allow known gossipsub topics.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum GossipKind { pub enum GossipKind {
/// Topic for publishing beacon blocks. /// Topic for publishing beacon blocks.
@ -44,6 +47,19 @@ pub enum GossipKind {
AttesterSlashing, AttesterSlashing,
} }
impl std::fmt::Display for GossipKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GossipKind::BeaconBlock => write!(f, "beacon_block"),
GossipKind::BeaconAggregateAndProof => write!(f, "beacon_aggregate_and_proof"),
GossipKind::CommitteeIndex(subnet_id) => write!(f, "committee_index_{}", **subnet_id),
GossipKind::VoluntaryExit => write!(f, "voluntary_exit"),
GossipKind::ProposerSlashing => write!(f, "proposer_slashing"),
GossipKind::AttesterSlashing => write!(f, "attester_slashing"),
}
}
}
/// The known encoding types for gossipsub messages. /// The known encoding types for gossipsub messages.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum GossipEncoding { pub enum GossipEncoding {
@ -52,8 +68,12 @@ pub enum GossipEncoding {
} }
impl GossipTopic { impl GossipTopic {
pub fn new(kind: GossipKind, encoding: GossipEncoding) -> Self { pub fn new(kind: GossipKind, encoding: GossipEncoding, fork_digest: [u8; 4]) -> Self {
GossipTopic { encoding, kind } GossipTopic {
encoding,
kind,
fork_digest,
}
} }
/// Returns the encoding type for the gossipsub topic. /// Returns the encoding type for the gossipsub topic.
@ -61,6 +81,11 @@ impl GossipTopic {
&self.encoding &self.encoding
} }
/// Returns a mutable reference to the fork digest of the gossipsub topic.
pub fn digest(&mut self) -> &mut [u8; 4] {
&mut self.fork_digest
}
/// Returns the kind of message expected on the gossipsub topic. /// Returns the kind of message expected on the gossipsub topic.
pub fn kind(&self) -> &GossipKind { pub fn kind(&self) -> &GossipKind {
&self.kind &self.kind
@ -68,12 +93,25 @@ impl GossipTopic {
pub fn decode(topic: &str) -> Result<Self, String> { pub fn decode(topic: &str) -> Result<Self, String> {
let topic_parts: Vec<&str> = topic.split('/').collect(); let topic_parts: Vec<&str> = topic.split('/').collect();
if topic_parts.len() == 4 && topic_parts[1] == TOPIC_PREFIX { if topic_parts.len() == 5 && topic_parts[1] == TOPIC_PREFIX {
let encoding = match topic_parts[3] { let digest_bytes = hex::decode(topic_parts[2])
.map_err(|e| format!("Could not decode fork_digest hex: {}", e))?;
if digest_bytes.len() != 4 {
return Err(format!(
"Invalid gossipsub fork digest size: {}",
digest_bytes.len()
));
}
let mut fork_digest = [0; 4];
fork_digest.copy_from_slice(&digest_bytes);
let encoding = match topic_parts[4] {
SSZ_ENCODING_POSTFIX => GossipEncoding::SSZ, SSZ_ENCODING_POSTFIX => GossipEncoding::SSZ,
_ => return Err(format!("Unknown encoding: {}", topic)), _ => return Err(format!("Unknown encoding: {}", topic)),
}; };
let kind = match topic_parts[2] { let kind = match topic_parts[3] {
BEACON_BLOCK_TOPIC => GossipKind::BeaconBlock, BEACON_BLOCK_TOPIC => GossipKind::BeaconBlock,
BEACON_AGGREGATE_AND_PROOF_TOPIC => GossipKind::BeaconAggregateAndProof, BEACON_AGGREGATE_AND_PROOF_TOPIC => GossipKind::BeaconAggregateAndProof,
VOLUNTARY_EXIT_TOPIC => GossipKind::VoluntaryExit, VOLUNTARY_EXIT_TOPIC => GossipKind::VoluntaryExit,
@ -85,7 +123,11 @@ impl GossipTopic {
}, },
}; };
return Ok(GossipTopic { encoding, kind }); return Ok(GossipTopic {
encoding,
kind,
fork_digest,
});
} }
Err(format!("Unknown topic: {}", topic)) Err(format!("Unknown topic: {}", topic))
@ -115,7 +157,13 @@ impl Into<String> for GossipTopic {
COMMITEE_INDEX_TOPIC_PREFIX, *index, COMMITEE_INDEX_TOPIC_POSTFIX COMMITEE_INDEX_TOPIC_PREFIX, *index, COMMITEE_INDEX_TOPIC_POSTFIX
), ),
}; };
format!("/{}/{}/{}", TOPIC_PREFIX, kind, encoding) format!(
"/{}/{}/{}/{}",
TOPIC_PREFIX,
hex::encode(self.fork_digest),
kind,
encoding
)
} }
} }

View File

@ -33,9 +33,13 @@ fn test_gossipsub_forward() {
message: empty_block, message: empty_block,
signature: Signature::empty_signature(), signature: Signature::empty_signature(),
}; };
let data = PubsubData::BeaconBlock(Box::new(signed_block)); let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block));
let pubsub_message = PubsubMessage::new(GossipEncoding::SSZ, data); let publishing_topic: String = pubsub_message
let publishing_topic: String = "/eth2/beacon_block/ssz".into(); .topics(GossipEncoding::SSZ, [0, 0, 0, 0])
.first()
.unwrap()
.clone()
.into();
let mut subscribed_count = 0; let mut subscribed_count = 0;
tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { tokio::run(futures::future::poll_fn(move || -> Result<_, ()> {
for node in nodes.iter_mut() { for node in nodes.iter_mut() {
@ -65,10 +69,8 @@ fn test_gossipsub_forward() {
} }
} }
Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) => { Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) => {
// Received topics is one of subscribed eth2 topics
assert!(topic.clone().into_string().starts_with("/eth2/"));
// Publish on beacon block topic // Publish on beacon block topic
if topic == TopicHash::from_raw("/eth2/beacon_block/ssz") { if topic == TopicHash::from_raw(publishing_topic.clone()) {
subscribed_count += 1; subscribed_count += 1;
// Every node except the corner nodes are connected to 2 nodes. // Every node except the corner nodes are connected to 2 nodes.
if subscribed_count == (num_nodes * 2) - 2 { if subscribed_count == (num_nodes * 2) - 2 {
@ -104,9 +106,13 @@ fn test_gossipsub_full_mesh_publish() {
message: empty_block, message: empty_block,
signature: Signature::empty_signature(), signature: Signature::empty_signature(),
}; };
let data = PubsubData::BeaconBlock(Box::new(signed_block)); let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block));
let pubsub_message = PubsubMessage::new(GossipEncoding::SSZ, data); let publishing_topic: String = pubsub_message
let publishing_topic: String = "/eth2/beacon_block/ssz".into(); .topics(GossipEncoding::SSZ, [0, 0, 0, 0])
.first()
.unwrap()
.clone()
.into();
let mut subscribed_count = 0; let mut subscribed_count = 0;
let mut received_count = 0; let mut received_count = 0;
tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { tokio::run(futures::future::poll_fn(move || -> Result<_, ()> {
@ -132,10 +138,8 @@ fn test_gossipsub_full_mesh_publish() {
while let Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) = while let Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) =
publishing_node.poll().unwrap() publishing_node.poll().unwrap()
{ {
// Received topics is one of subscribed eth2 topics
assert!(topic.clone().into_string().starts_with("/eth2/"));
// Publish on beacon block topic // Publish on beacon block topic
if topic == TopicHash::from_raw("/eth2/beacon_block/ssz") { if topic == TopicHash::from_raw(publishing_topic.clone()) {
subscribed_count += 1; subscribed_count += 1;
if subscribed_count == num_nodes - 1 { if subscribed_count == num_nodes - 1 {
publishing_node.swarm.publish(vec![pubsub_message.clone()]); publishing_node.swarm.publish(vec![pubsub_message.clone()]);

View File

@ -29,7 +29,7 @@ fn test_status_rpc() {
// Dummy STATUS RPC message // Dummy STATUS RPC message
let rpc_request = RPCRequest::Status(StatusMessage { let rpc_request = RPCRequest::Status(StatusMessage {
fork_version: [0; 4], fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0), finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1), finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0), head_root: Hash256::from_low_u64_be(0),
@ -38,7 +38,7 @@ fn test_status_rpc() {
// Dummy STATUS RPC message // Dummy STATUS RPC message
let rpc_response = RPCResponse::Status(StatusMessage { let rpc_response = RPCResponse::Status(StatusMessage {
fork_version: [0; 4], fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0), finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1), finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0), head_root: Hash256::from_low_u64_be(0),
@ -142,7 +142,6 @@ fn test_blocks_by_range_chunked_rpc() {
// BlocksByRange Request // BlocksByRange Request
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
head_block_root: Hash256::from_low_u64_be(0),
start_slot: 0, start_slot: 0,
count: messages_to_send, count: messages_to_send,
step: 0, step: 0,
@ -275,7 +274,6 @@ fn test_blocks_by_range_single_empty_rpc() {
// BlocksByRange Request // BlocksByRange Request
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
head_block_root: Hash256::from_low_u64_be(0),
start_slot: 0, start_slot: 0,
count: 10, count: 10,
step: 0, step: 0,

View File

@ -11,7 +11,7 @@ use crate::service::NetworkMessage;
use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes}; use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes};
use eth2_libp2p::{ use eth2_libp2p::{
rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination},
MessageId, PeerId, PubsubData, PubsubMessage, RPCEvent, MessageId, PeerId, PubsubMessage, RPCEvent,
}; };
use futures::future::Future; use futures::future::Future;
use futures::stream::Stream; use futures::stream::Stream;
@ -217,9 +217,9 @@ impl<T: BeaconChainTypes> Router<T> {
peer_id: PeerId, peer_id: PeerId,
gossip_message: PubsubMessage<T::EthSpec>, gossip_message: PubsubMessage<T::EthSpec>,
) { ) {
match gossip_message.data { match gossip_message {
// Attestations should never reach the router. // Attestations should never reach the router.
PubsubData::AggregateAndProofAttestation(aggregate_and_proof) => { PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => {
if self if self
.processor .processor
.should_forward_aggregate_attestation(&aggregate_and_proof) .should_forward_aggregate_attestation(&aggregate_and_proof)
@ -232,7 +232,7 @@ impl<T: BeaconChainTypes> Router<T> {
AttestationType::Aggregated, AttestationType::Aggregated,
); );
} }
PubsubData::Attestation(subnet_attestation) => { PubsubMessage::Attestation(subnet_attestation) => {
if self if self
.processor .processor
.should_forward_attestation(&subnet_attestation.1) .should_forward_attestation(&subnet_attestation.1)
@ -245,7 +245,7 @@ impl<T: BeaconChainTypes> Router<T> {
AttestationType::Unaggregated { should_store: true }, AttestationType::Unaggregated { should_store: true },
); );
} }
PubsubData::BeaconBlock(block) => match self.processor.should_forward_block(block) { PubsubMessage::BeaconBlock(block) => match self.processor.should_forward_block(block) {
Ok(verified_block) => { Ok(verified_block) => {
self.propagate_message(id, peer_id.clone()); self.propagate_message(id, peer_id.clone());
self.processor.on_block_gossip(peer_id, verified_block); self.processor.on_block_gossip(peer_id, verified_block);
@ -255,19 +255,19 @@ impl<T: BeaconChainTypes> Router<T> {
"error" => format!("{:?}", e)); "error" => format!("{:?}", e));
} }
}, },
PubsubData::VoluntaryExit(_exit) => { PubsubMessage::VoluntaryExit(_exit) => {
// TODO: Apply more sophisticated validation // TODO: Apply more sophisticated validation
self.propagate_message(id, peer_id.clone()); self.propagate_message(id, peer_id.clone());
// TODO: Handle exits // TODO: Handle exits
debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) ); debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) );
} }
PubsubData::ProposerSlashing(_proposer_slashing) => { PubsubMessage::ProposerSlashing(_proposer_slashing) => {
// TODO: Apply more sophisticated validation // TODO: Apply more sophisticated validation
self.propagate_message(id, peer_id.clone()); self.propagate_message(id, peer_id.clone());
// TODO: Handle proposer slashings // TODO: Handle proposer slashings
debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) ); debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) );
} }
PubsubData::AttesterSlashing(_attester_slashing) => { PubsubMessage::AttesterSlashing(_attester_slashing) => {
// TODO: Apply more sophisticated validation // TODO: Apply more sophisticated validation
self.propagate_message(id, peer_id.clone()); self.propagate_message(id, peer_id.clone());
// TODO: Handle attester slashings // TODO: Handle attester slashings

View File

@ -25,7 +25,7 @@ pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;
/// Keeps track of syncing information for known connected peers. /// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo { pub struct PeerSyncInfo {
fork_version: [u8; 4], fork_digest: [u8; 4],
pub finalized_root: Hash256, pub finalized_root: Hash256,
pub finalized_epoch: Epoch, pub finalized_epoch: Epoch,
pub head_root: Hash256, pub head_root: Hash256,
@ -35,7 +35,7 @@ pub struct PeerSyncInfo {
impl From<StatusMessage> for PeerSyncInfo { impl From<StatusMessage> for PeerSyncInfo {
fn from(status: StatusMessage) -> PeerSyncInfo { fn from(status: StatusMessage) -> PeerSyncInfo {
PeerSyncInfo { PeerSyncInfo {
fork_version: status.fork_version, fork_digest: status.fork_digest,
finalized_root: status.finalized_root, finalized_root: status.finalized_root,
finalized_epoch: status.finalized_epoch, finalized_epoch: status.finalized_epoch,
head_root: status.head_root, head_root: status.head_root,
@ -123,7 +123,7 @@ impl<T: BeaconChainTypes> Processor<T> {
self.log, self.log,
"Sending Status Request"; "Sending Status Request";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"fork_version" => format!("{:?}", status_message.fork_version), "fork_digest" => format!("{:?}", status_message.fork_digest),
"finalized_root" => format!("{:?}", status_message.finalized_root), "finalized_root" => format!("{:?}", status_message.finalized_root),
"finalized_epoch" => format!("{:?}", status_message.finalized_epoch), "finalized_epoch" => format!("{:?}", status_message.finalized_epoch),
"head_root" => format!("{}", status_message.head_root), "head_root" => format!("{}", status_message.head_root),
@ -147,7 +147,7 @@ impl<T: BeaconChainTypes> Processor<T> {
self.log, self.log,
"Received Status Request"; "Received Status Request";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"fork_version" => format!("{:?}", status.fork_version), "fork_digest" => format!("{:?}", status.fork_digest),
"finalized_root" => format!("{:?}", status.finalized_root), "finalized_root" => format!("{:?}", status.finalized_root),
"finalized_epoch" => format!("{:?}", status.finalized_epoch), "finalized_epoch" => format!("{:?}", status.finalized_epoch),
"head_root" => format!("{}", status.head_root), "head_root" => format!("{}", status.head_root),
@ -193,12 +193,14 @@ impl<T: BeaconChainTypes> Processor<T> {
let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch());
if local.fork_version != remote.fork_version { if local.fork_digest != remote.fork_digest {
// The node is on a different network/fork, disconnect them. // The node is on a different network/fork, disconnect them.
debug!( debug!(
self.log, "Handshake Failure"; self.log, "Handshake Failure";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"reason" => "network_id" "reason" => "incompatible forks",
"our_fork" => hex::encode(local.fork_digest),
"their_fork" => hex::encode(remote.fork_digest)
); );
self.network self.network
@ -631,8 +633,9 @@ pub(crate) fn status_message<T: BeaconChainTypes>(
) -> Option<StatusMessage> { ) -> Option<StatusMessage> {
let head_info = beacon_chain.head_info().ok()?; let head_info = beacon_chain.head_info().ok()?;
// TODO: Update fork digest calculation
Some(StatusMessage { Some(StatusMessage {
fork_version: head_info.fork.current_version, fork_digest: head_info.fork.current_version,
finalized_root: head_info.finalized_checkpoint.root, finalized_root: head_info.finalized_checkpoint.root,
finalized_epoch: head_info.finalized_checkpoint.epoch, finalized_epoch: head_info.finalized_checkpoint.epoch,
head_root: head_info.block_root, head_root: head_info.block_root,

View File

@ -8,7 +8,7 @@ use crate::{
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{rpc::RPCRequest, Enr, Libp2pEvent, MessageId, NetworkGlobals, PeerId, Swarm}; use eth2_libp2p::{rpc::RPCRequest, Enr, Libp2pEvent, MessageId, NetworkGlobals, PeerId, Swarm};
use eth2_libp2p::{PubsubData, PubsubMessage, RPCEvent}; use eth2_libp2p::{PubsubMessage, RPCEvent};
use futures::prelude::*; use futures::prelude::*;
use futures::Stream; use futures::Stream;
use rest_types::ValidatorSubscription; use rest_types::ValidatorSubscription;
@ -217,15 +217,13 @@ fn spawn_service<T: BeaconChainTypes>(
if !should_send { if !should_send {
info!(log, "Random filter did not publish messages"); info!(log, "Random filter did not publish messages");
} else { } else {
let mut unique_topics = Vec::new(); let mut topic_kinds = Vec::new();
for message in &messages { for message in &messages {
for topic in message.topics() { if !topic_kinds.contains(&message.kind()) {
if !unique_topics.contains(&topic) { topic_kinds.push(message.kind());
unique_topics.push(topic);
} }
} }
} debug!(log, "Sending pubsub messages"; "count" => messages.len(), "topics" => format!("{:?}", topic_kinds));
debug!(log, "Sending pubsub messages"; "count" => messages.len(), "topics" => format!("{:?}", unique_topics));
service.libp2p.swarm.publish(messages); service.libp2p.swarm.publish(messages);
} }
} }
@ -310,9 +308,9 @@ fn spawn_service<T: BeaconChainTypes>(
.. ..
} => { } => {
match message.data { match message {
// attestation information gets processed in the attestation service // attestation information gets processed in the attestation service
PubsubData::Attestation(ref subnet_and_attestation) => { PubsubMessage::Attestation(ref subnet_and_attestation) => {
let subnet = &subnet_and_attestation.0; let subnet = &subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1; let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we process // checks if we have an aggregator for the slot. If so, we process

View File

@ -43,7 +43,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
self.log, self.log,
"Sending Status Request"; "Sending Status Request";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"fork_version" => format!("{:?}", status_message.fork_version), "fork_digest" => format!("{:?}", status_message.fork_digest),
"finalized_root" => format!("{:?}", status_message.finalized_root), "finalized_root" => format!("{:?}", status_message.finalized_root),
"finalized_epoch" => format!("{:?}", status_message.finalized_epoch), "finalized_epoch" => format!("{:?}", status_message.finalized_epoch),
"head_root" => format!("{}", status_message.head_root), "head_root" => format!("{}", status_message.head_root),

View File

@ -9,7 +9,7 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::ops::Sub; use std::ops::Sub;
use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; use types::{EthSpec, SignedBeaconBlock, Slot};
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Copy, Clone, Debug, PartialEq)]
pub struct BatchId(pub u64); pub struct BatchId(pub u64);
@ -41,8 +41,6 @@ pub struct Batch<T: EthSpec> {
pub start_slot: Slot, pub start_slot: Slot,
/// The requested end slot of batch, exclusive. /// The requested end slot of batch, exclusive.
pub end_slot: Slot, pub end_slot: Slot,
/// The hash of the chain root to requested from the peer.
pub head_root: Hash256,
/// The peer that was originally assigned to the batch. /// The peer that was originally assigned to the batch.
pub original_peer: PeerId, pub original_peer: PeerId,
/// The peer that is currently assigned to the batch. /// The peer that is currently assigned to the batch.
@ -61,18 +59,11 @@ pub struct Batch<T: EthSpec> {
impl<T: EthSpec> Eq for Batch<T> {} impl<T: EthSpec> Eq for Batch<T> {}
impl<T: EthSpec> Batch<T> { impl<T: EthSpec> Batch<T> {
pub fn new( pub fn new(id: BatchId, start_slot: Slot, end_slot: Slot, peer_id: PeerId) -> Self {
id: BatchId,
start_slot: Slot,
end_slot: Slot,
head_root: Hash256,
peer_id: PeerId,
) -> Self {
Batch { Batch {
id, id,
start_slot, start_slot,
end_slot, end_slot,
head_root,
original_peer: peer_id.clone(), original_peer: peer_id.clone(),
current_peer: peer_id, current_peer: peer_id,
retries: 0, retries: 0,
@ -84,7 +75,6 @@ impl<T: EthSpec> Batch<T> {
pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest {
BlocksByRangeRequest { BlocksByRangeRequest {
head_block_root: self.head_root,
start_slot: self.start_slot.into(), start_slot: self.start_slot.into(),
count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()), count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()),
step: 1, step: 1,

View File

@ -449,7 +449,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"end_slot" => batch.end_slot, "end_slot" => batch.end_slot,
"id" => *batch.id, "id" => *batch.id,
"peer" => format!("{}", batch.current_peer), "peer" => format!("{}", batch.current_peer),
"head_root"=> format!("{}", batch.head_root),
"retries" => batch.retries, "retries" => batch.retries,
"re-processes" => batch.reprocess_retries); "re-processes" => batch.reprocess_retries);
self.send_batch(network, batch); self.send_batch(network, batch);
@ -578,8 +577,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"start_slot" => batch.start_slot, "start_slot" => batch.start_slot,
"end_slot" => batch.end_slot, "end_slot" => batch.end_slot,
"id" => *batch.id, "id" => *batch.id,
"peer" => format!("{:?}", batch.current_peer), "peer" => format!("{:?}", batch.current_peer));
"head_root"=> format!("{}", batch.head_root));
self.send_batch(network, batch); self.send_batch(network, batch);
ProcessingResult::KeepChain ProcessingResult::KeepChain
} }
@ -603,8 +601,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"start_slot" => batch.start_slot, "start_slot" => batch.start_slot,
"end_slot" => batch.end_slot, "end_slot" => batch.end_slot,
"id" => *batch.id, "id" => *batch.id,
"peer" => format!("{}", batch.current_peer), "peer" => format!("{}", batch.current_peer));
"head_root"=> format!("{}", batch.head_root));
// send the batch // send the batch
self.send_batch(network, batch); self.send_batch(network, batch);
return true; return true;
@ -675,7 +672,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch_id, batch_id,
batch_start_slot, batch_start_slot,
batch_end_slot, batch_end_slot,
self.target_head_root,
peer_id, peer_id,
)) ))
} }

View File

@ -1,8 +1,7 @@
use crate::{ApiError, ApiResult, NetworkChannel}; use crate::{ApiError, ApiResult, NetworkChannel};
use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig}; use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig};
use bls::PublicKeyBytes; use bls::PublicKeyBytes;
use eth2_libp2p::types::GossipEncoding; use eth2_libp2p::PubsubMessage;
use eth2_libp2p::{PubsubData, PubsubMessage};
use hex; use hex;
use http::header; use http::header;
use hyper::{Body, Request}; use hyper::{Body, Request};
@ -235,10 +234,7 @@ pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>(
block: SignedBeaconBlock<T::EthSpec>, block: SignedBeaconBlock<T::EthSpec>,
) -> Result<(), ApiError> { ) -> Result<(), ApiError> {
// send the block via SSZ encoding // send the block via SSZ encoding
let messages = vec![PubsubMessage::new( let messages = vec![PubsubMessage::BeaconBlock(Box::new(block))];
GossipEncoding::SSZ,
PubsubData::BeaconBlock(Box::new(block)),
)];
// Publish the block to the p2p network via gossipsub. // Publish the block to the p2p network via gossipsub.
if let Err(e) = chan.try_send(NetworkMessage::Publish { messages }) { if let Err(e) = chan.try_send(NetworkMessage::Publish { messages }) {
@ -261,10 +257,7 @@ pub fn publish_raw_attestations_to_network<T: BeaconChainTypes + 'static>(
.map(|attestation| { .map(|attestation| {
// create the gossip message to send to the network // create the gossip message to send to the network
let subnet_id = attestation.subnet_id(); let subnet_id = attestation.subnet_id();
PubsubMessage::new( PubsubMessage::Attestation(Box::new((subnet_id, attestation)))
GossipEncoding::SSZ,
PubsubData::Attestation(Box::new((subnet_id, attestation))),
)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -286,12 +279,7 @@ pub fn publish_aggregate_attestations_to_network<T: BeaconChainTypes + 'static>(
) -> Result<(), ApiError> { ) -> Result<(), ApiError> {
let messages = signed_proofs let messages = signed_proofs
.into_iter() .into_iter()
.map(|signed_proof| { .map(|signed_proof| PubsubMessage::AggregateAndProofAttestation(Box::new(signed_proof)))
PubsubMessage::new(
GossipEncoding::SSZ,
PubsubData::AggregateAndProofAttestation(Box::new(signed_proof)),
)
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Publish the attestations to the p2p network via gossipsub. // Publish the attestations to the p2p network via gossipsub.

View File

@ -125,14 +125,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
This disables this feature, fixing the ENR's IP/PORT to those specified on boot.") This disables this feature, fixing the ENR's IP/PORT to those specified on boot.")
.takes_value(true), .takes_value(true),
) )
.arg(
Arg::with_name("topics")
.long("topics")
.value_name("STRING")
.help("One or more comma-delimited gossipsub topic strings to subscribe to. Default \
is determined automatically.")
.takes_value(true),
)
.arg( .arg(
Arg::with_name("libp2p-addresses") Arg::with_name("libp2p-addresses")
.long("libp2p-addresses") .long("libp2p-addresses")

View File

@ -1,7 +1,7 @@
use clap::ArgMatches; use clap::ArgMatches;
use client::{config::DEFAULT_DATADIR, ClientConfig, ClientGenesis, Eth2Config}; use client::{config::DEFAULT_DATADIR, ClientConfig, ClientGenesis, Eth2Config};
use eth2_config::{read_from_file, write_to_file}; use eth2_config::{read_from_file, write_to_file};
use eth2_libp2p::{Enr, GossipTopic, Multiaddr}; use eth2_libp2p::{Enr, Multiaddr};
use eth2_testnet_config::Eth2TestnetConfig; use eth2_testnet_config::Eth2TestnetConfig;
use genesis::recent_genesis_time; use genesis::recent_genesis_time;
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
@ -141,15 +141,6 @@ pub fn get_configs<E: EthSpec>(
.collect::<Result<Vec<Multiaddr>>>()?; .collect::<Result<Vec<Multiaddr>>>()?;
} }
if let Some(topics_str) = cli_args.value_of("topics") {
let mut topics = Vec::new();
let topic_list = topics_str.split(',').collect::<Vec<_>>();
for topic_str in topic_list {
topics.push(GossipTopic::decode(topic_str)?);
}
client_config.network.topics = topics;
}
if let Some(enr_address_str) = cli_args.value_of("enr-address") { if let Some(enr_address_str) = cli_args.value_of("enr-address") {
client_config.network.enr_address = Some( client_config.network.enr_address = Some(
enr_address_str enr_address_str