Initial Interop Updates (#492)

* Add interop chain spec and rename chain_id

* Add ability to connect to raw libp2p nodes

* Adds Identify protocol, clean up RPC protocol name handling

* Update to latest libp2p, gossipsub improvements

* Updates to latest interop branch.

- Shifts decoding of objects into message handler.
- Updates to latest interop gossipsub.
- Adds interop spec constant.

* Configuration updates allow for verbosity CLI flag and spec constants

* Update submodules to master

* Correct minimal chainspec modifications

* Duplication of validator polls are no longer fatal

* Apply PR suggestions
This commit is contained in:
Age Manning 2019-08-10 11:44:17 +10:00 committed by Paul Hauner
parent 3a1f56a42e
commit 468015f9bb
28 changed files with 590 additions and 436 deletions

View File

@ -1,3 +1,4 @@
use crate::Eth2Config;
use clap::ArgMatches;
use http_server::HttpServerConfig;
use network::NetworkConfig;
@ -56,8 +57,6 @@ impl Default for Config {
log_file: PathBuf::from(""),
db_type: "disk".to_string(),
db_name: "chain_db".to_string(),
// Note: there are no default bootnodes specified.
// Once bootnodes are established, add them here.
network: NetworkConfig::new(),
rpc: rpc::RPCConfig::default(),
http: HttpServerConfig::default(),
@ -129,6 +128,15 @@ impl Config {
self.data_dir = PathBuf::from(dir);
};
if let Some(default_spec) = args.value_of("default-spec") {
match default_spec {
"mainnet" => self.spec_constants = Eth2Config::mainnet().spec_constants,
"minimal" => self.spec_constants = Eth2Config::minimal().spec_constants,
"interop" => self.spec_constants = Eth2Config::interop().spec_constants,
_ => {} // not supported
}
}
if let Some(dir) = args.value_of("db") {
self.db_type = dir.to_string();
};

View File

@ -7,8 +7,8 @@ edition = "2018"
[dependencies]
clap = "2.32.0"
#SigP repository
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b" }
enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b", features = ["serde"] }
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd" }
enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd", features = ["serde"] }
types = { path = "../../eth2/types" }
serde = "1.0"
serde_derive = "1.0"

View File

@ -2,47 +2,52 @@ use crate::discovery::Discovery;
use crate::rpc::{RPCEvent, RPCMessage, RPC};
use crate::{error, NetworkConfig};
use crate::{Topic, TopicHash};
use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC};
use futures::prelude::*;
use libp2p::{
core::{
identity::Keypair,
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
},
core::identity::Keypair,
discv5::Discv5Event,
gossipsub::{Gossipsub, GossipsubEvent},
identify::{Identify, IdentifyEvent},
ping::{Ping, PingConfig, PingEvent},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use slog::{o, trace, warn};
use ssz::{ssz_encode, Decode, DecodeError, Encode};
use slog::{debug, o, trace};
use ssz::{ssz_encode, Encode};
use std::num::NonZeroU32;
use std::time::Duration;
use types::{Attestation, BeaconBlock, EthSpec};
const MAX_IDENTIFY_ADDRESSES: usize = 20;
/// Builds the network behaviour that manages the core protocols of eth2.
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent<E>", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> {
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
/// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub<TSubstream>,
/// The serenity RPC specified in the wire-0 protocol.
serenity_rpc: RPC<TSubstream, E>,
/// The Eth2 RPC specified in the wire-0 protocol.
eth2_rpc: RPC<TSubstream>,
/// Keep regular connection to peers and disconnect if absent.
// TODO: Remove Libp2p ping in favour of discv5 ping.
ping: Ping<TSubstream>,
/// Kademlia for peer discovery.
// TODO: Using id for initial interop. This will be removed by mainnet.
/// Provides IP addresses and peer information.
identify: Identify<TSubstream>,
/// Discovery behaviour.
discovery: Discovery<TSubstream>,
#[behaviour(ignore)]
/// The events generated by this behaviour to be consumed in the swarm poll.
events: Vec<BehaviourEvent<E>>,
events: Vec<BehaviourEvent>,
/// Logger for behaviour actions.
#[behaviour(ignore)]
log: slog::Logger,
}
impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn new(
local_key: &Keypair,
net_conf: &NetworkConfig,
@ -50,17 +55,25 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
) -> error::Result<Self> {
let local_peer_id = local_key.public().clone().into_peer_id();
let behaviour_log = log.new(o!());
let ping_config = PingConfig::new()
.with_timeout(Duration::from_secs(30))
.with_interval(Duration::from_secs(20))
.with_max_failures(NonZeroU32::new(2).expect("2 != 0"))
.with_keep_alive(false);
let identify = Identify::new(
"lighthouse/libp2p".into(),
version::version(),
local_key.public(),
);
Ok(Behaviour {
serenity_rpc: RPC::new(log),
eth2_rpc: RPC::new(log),
gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()),
discovery: Discovery::new(local_key, net_conf, log)?,
ping: Ping::new(ping_config),
identify,
events: Vec::new(),
log: behaviour_log,
})
@ -68,31 +81,20 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent>
for Behaviour<TSubstream, E>
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(gs_msg) => {
trace!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg));
let pubsub_message = match PubsubMessage::from_ssz_bytes(&gs_msg.data) {
//TODO: Punish peer on error
Err(e) => {
warn!(
self.log,
"Received undecodable message from Peer {:?} error", gs_msg.source;
"error" => format!("{:?}", e)
);
return;
}
Ok(msg) => msg,
};
let msg = PubsubMessage::from_topics(&gs_msg.topics, gs_msg.data);
self.events.push(BehaviourEvent::GossipMessage {
source: gs_msg.source,
topics: gs_msg.topics,
message: Box::new(pubsub_message),
message: msg,
});
}
GossipsubEvent::Subscribed { .. } => {}
@ -101,8 +103,8 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> NetworkBehaviourEventProces
}
}
impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> NetworkBehaviourEventProcess<RPCMessage>
for Behaviour<TSubstream, E>
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: RPCMessage) {
match event {
@ -119,19 +121,19 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> NetworkBehaviourEventProces
}
}
impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> NetworkBehaviourEventProcess<PingEvent>
for Behaviour<TSubstream, E>
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, _event: PingEvent) {
// not interested in ping responses at the moment.
}
}
impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent<E>>> {
) -> Async<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
@ -140,8 +142,36 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
}
}
impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> NetworkBehaviourEventProcess<Discv5Event>
for Behaviour<TSubstream, E>
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Identified {
peer_id, mut info, ..
} => {
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
debug!(
self.log,
"More than 20 addresses have been identified, truncating"
);
info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES);
}
debug!(self.log, "Identified Peer"; "Peer" => format!("{}", peer_id),
"Protocol Version" => info.protocol_version,
"Agent Version" => info.agent_version,
"Listening Addresses" => format!("{:?}", info.listen_addrs),
"Protocols" => format!("{:?}", info.protocols)
);
}
IdentifyEvent::Error { .. } => {}
IdentifyEvent::SendBack { .. } => {}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<Discv5Event>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, _event: Discv5Event) {
// discv5 has no events to inject
@ -149,7 +179,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> NetworkBehaviourEventProces
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/* Pubsub behaviour functions */
/// Subscribes to a gossipsub topic.
@ -158,7 +188,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
}
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topics: Vec<Topic>, message: PubsubMessage<E>) {
pub fn publish(&mut self, topics: Vec<Topic>, message: PubsubMessage) {
let message_bytes = ssz_encode(&message);
for topic in topics {
self.gossipsub.publish(topic, message_bytes.clone());
@ -169,7 +199,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
/// Sends an RPC Request/Response via the RPC protocol.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event);
self.eth2_rpc.send_rpc(peer_id, rpc_event);
}
/* Discovery / Peer management functions */
@ -179,99 +209,60 @@ impl<TSubstream: AsyncRead + AsyncWrite, E: EthSpec> Behaviour<TSubstream, E> {
}
/// The types of events than can be obtained from polling the behaviour.
pub enum BehaviourEvent<E: EthSpec> {
pub enum BehaviourEvent {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
PeerDisconnected(PeerId),
GossipMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: Box<PubsubMessage<E>>,
message: PubsubMessage,
},
}
/// Messages that are passed to and from the pubsub (Gossipsub) behaviour.
#[derive(Debug, Clone, PartialEq)]
pub enum PubsubMessage<E: EthSpec> {
pub enum PubsubMessage {
/// Gossipsub message providing notification of a new block.
Block(BeaconBlock<E>),
Block(Vec<u8>),
/// Gossipsub message providing notification of a new attestation.
Attestation(Attestation<E>),
Attestation(Vec<u8>),
/// Gossipsub message from an unknown topic.
Unknown(Vec<u8>),
}
//TODO: Correctly encode/decode enums. Prefixing with integer for now.
impl<E: EthSpec> Encode for PubsubMessage<E> {
impl PubsubMessage {
/* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will
* need to be modified.
*
* Also note that a message can be associated with many topics. As soon as one of the topics is
* known we match. If none of the topics are known we return an unknown state.
*/
fn from_topics(topics: &Vec<TopicHash>, data: Vec<u8>) -> Self {
for topic in topics {
match topic.as_str() {
BEACON_BLOCK_TOPIC => return PubsubMessage::Block(data),
BEACON_ATTESTATION_TOPIC => return PubsubMessage::Attestation(data),
_ => {}
}
}
PubsubMessage::Unknown(data)
}
}
impl Encode for PubsubMessage {
fn is_ssz_fixed_len() -> bool {
false
}
fn ssz_append(&self, buf: &mut Vec<u8>) {
let offset = <u32 as Encode>::ssz_fixed_len() + <Vec<u8> as Encode>::ssz_fixed_len();
let mut encoder = ssz::SszEncoder::container(buf, offset);
match self {
PubsubMessage::Block(block_gossip) => {
encoder.append(&0_u32);
PubsubMessage::Block(inner)
| PubsubMessage::Attestation(inner)
| PubsubMessage::Unknown(inner) => {
// Encode the gossip as a Vec<u8>;
encoder.append(&block_gossip.as_ssz_bytes());
}
PubsubMessage::Attestation(attestation_gossip) => {
encoder.append(&1_u32);
// Encode the gossip as a Vec<u8>;
encoder.append(&attestation_gossip.as_ssz_bytes());
buf.append(&mut inner.as_ssz_bytes());
}
}
encoder.finalize();
}
}
impl<E: EthSpec> Decode for PubsubMessage<E> {
fn is_ssz_fixed_len() -> bool {
false
}
fn from_ssz_bytes(bytes: &[u8]) -> Result<Self, ssz::DecodeError> {
let mut builder = ssz::SszDecoderBuilder::new(&bytes);
builder.register_type::<u32>()?;
builder.register_type::<Vec<u8>>()?;
let mut decoder = builder.build()?;
let id: u32 = decoder.decode_next()?;
let body: Vec<u8> = decoder.decode_next()?;
match id {
0 => Ok(PubsubMessage::Block(BeaconBlock::from_ssz_bytes(&body)?)),
1 => Ok(PubsubMessage::Attestation(Attestation::from_ssz_bytes(
&body,
)?)),
_ => Err(DecodeError::BytesInvalid(
"Invalid PubsubMessage id".to_string(),
)),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use types::*;
#[test]
fn ssz_encoding() {
let original = PubsubMessage::Block(BeaconBlock::<MainnetEthSpec>::empty(
&MainnetEthSpec::default_spec(),
));
let encoded = ssz_encode(&original);
let decoded = PubsubMessage::from_ssz_bytes(&encoded).unwrap();
assert_eq!(original, decoded);
}
}

View File

@ -1,12 +1,13 @@
use clap::ArgMatches;
use enr::Enr;
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder};
use libp2p::Multiaddr;
use serde_derive::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
/// The beacon node topic string to subscribe to.
pub const BEACON_PUBSUB_TOPIC: &str = "beacon_block";
pub const BEACON_BLOCK_TOPIC: &str = "beacon_block";
pub const BEACON_ATTESTATION_TOPIC: &str = "beacon_attestation";
pub const SHARD_TOPIC_PREFIX: &str = "shard";
@ -39,6 +40,9 @@ pub struct Config {
/// List of nodes to initially connect to.
pub boot_nodes: Vec<Enr>,
/// List of libp2p nodes to initially connect to.
pub libp2p_nodes: Vec<Multiaddr>,
/// Client version
pub client_version: String,
@ -60,12 +64,13 @@ impl Default for Config {
discovery_port: 9000,
max_peers: 10,
//TODO: Set realistic values for production
// Note: This defaults topics to plain strings. Not hashes
gs_config: GossipsubConfigBuilder::new()
.max_gossip_size(4_000_000)
.inactivity_timeout(Duration::from_secs(90))
.max_transmit_size(1_000_000)
.heartbeat_interval(Duration::from_secs(20))
.build(),
boot_nodes: vec![],
libp2p_nodes: vec![],
client_version: version::version(),
topics: Vec::new(),
}
@ -118,6 +123,21 @@ impl Config {
.collect::<Result<Vec<Enr>, _>>()?;
}
if let Some(libp2p_addresses_str) = args.value_of("libp2p-addresses") {
self.libp2p_nodes = libp2p_addresses_str
.split(',')
.map(|multiaddr| {
multiaddr
.parse()
.map_err(|_| format!("Invalid Multiaddr: {}", multiaddr))
})
.collect::<Result<Vec<Multiaddr>, _>>()?;
}
if let Some(topics_str) = args.value_of("topics") {
self.topics = topics_str.split(',').map(|s| s.into()).collect();
}
if let Some(discovery_address_str) = args.value_of("discovery-address") {
self.discovery_address = discovery_address_str
.parse()

View File

@ -4,13 +4,11 @@ use crate::{error, NetworkConfig};
/// Currently using discv5 for peer discovery.
///
use futures::prelude::*;
use libp2p::core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::core::{identity::Keypair, Multiaddr, PeerId, ProtocolsHandler};
use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId};
use libp2p::discv5::{Discv5, Discv5Event};
use libp2p::enr::{Enr, EnrBuilder, NodeId};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use slog::{debug, info, o, warn};
use std::collections::HashSet;
use std::fs::File;
@ -37,6 +35,9 @@ pub struct Discovery<TSubstream> {
/// The target number of connected peers on the libp2p interface.
max_peers: usize,
/// directory to save ENR to
enr_dir: String,
/// The delay between peer discovery searches.
peer_discovery_delay: Delay,
@ -54,9 +55,6 @@ pub struct Discovery<TSubstream> {
/// Logger for the discovery behaviour.
log: slog::Logger,
/// directory to save ENR to
enr_dir: String,
}
impl<TSubstream> Discovery<TSubstream> {

View File

@ -11,9 +11,9 @@ mod service;
pub use behaviour::PubsubMessage;
pub use config::{
Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX,
Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, SHARD_TOPIC_PREFIX,
};
pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash};
pub use libp2p::gossipsub::{Topic, TopicHash};
pub use libp2p::multiaddr;
pub use libp2p::Multiaddr;
pub use libp2p::{

View File

@ -5,23 +5,21 @@ use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use core::marker::PhantomData;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::core::protocols_handler::{
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
use smallvec::SmallVec;
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use types::EthSpec;
/// The time (in seconds) before a substream that is awaiting a response times out.
pub const RESPONSE_TIMEOUT: u64 = 9;
/// Implementation of `ProtocolsHandler` for the RPC protocol.
pub struct RPCHandler<TSubstream, E>
pub struct RPCHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
E: EthSpec,
{
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<RPCProtocol>,
@ -56,8 +54,8 @@ where
/// After the given duration has elapsed, an inactive connection will shutdown.
inactive_timeout: Duration,
/// Phantom EthSpec.
_phantom: PhantomData<E>,
/// Marker to pin the generic stream.
_phantom: PhantomData<TSubstream>,
}
/// An outbound substream is waiting a response from the user.
@ -90,10 +88,9 @@ where
},
}
impl<TSubstream, E> RPCHandler<TSubstream, E>
impl<TSubstream> RPCHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
E: EthSpec,
{
pub fn new(
listen_protocol: SubstreamProtocol<RPCProtocol>,
@ -145,20 +142,18 @@ where
}
}
impl<TSubstream, E> Default for RPCHandler<TSubstream, E>
impl<TSubstream> Default for RPCHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
E: EthSpec,
{
fn default() -> Self {
RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30))
}
}
impl<TSubstream, E> ProtocolsHandler for RPCHandler<TSubstream, E>
impl<TSubstream> ProtocolsHandler for RPCHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
E: EthSpec,
{
type InEvent = RPCEvent;
type OutEvent = RPCEvent;
@ -273,7 +268,11 @@ where
Self::Error,
> {
if let Some(err) = self.pending_error.take() {
return Err(err);
// Returning an error here will result in dropping any peer that doesn't support any of
// the RPC protocols. For our immediate purposes we permit this and simply log that an
// upgrade was not supported.
// TODO: Add a logger to the handler for trace output.
dbg!(&err);
}
// return any events that need to be reported

View File

@ -6,9 +6,9 @@
use futures::prelude::*;
use handler::RPCHandler;
use libp2p::core::protocols_handler::ProtocolsHandler;
use libp2p::core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
use libp2p::core::ConnectedPoint;
use libp2p::swarm::{
protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::{Multiaddr, PeerId};
pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId};
@ -16,7 +16,6 @@ pub use protocol::{RPCError, RPCProtocol, RPCRequest};
use slog::o;
use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite};
use types::EthSpec;
pub(crate) mod codec;
mod handler;
@ -50,16 +49,16 @@ impl RPCEvent {
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
pub struct RPC<TSubstream, E: EthSpec> {
pub struct RPC<TSubstream> {
/// Queue of events to processed.
events: Vec<NetworkBehaviourAction<RPCEvent, RPCMessage>>,
/// Pins the generic substream.
marker: PhantomData<(TSubstream, E)>,
marker: PhantomData<(TSubstream)>,
/// Slog logger for RPC behaviour.
_log: slog::Logger,
}
impl<TSubstream, E: EthSpec> RPC<TSubstream, E> {
impl<TSubstream> RPC<TSubstream> {
pub fn new(log: &slog::Logger) -> Self {
let log = log.new(o!("Service" => "Libp2p-RPC"));
RPC {
@ -80,12 +79,11 @@ impl<TSubstream, E: EthSpec> RPC<TSubstream, E> {
}
}
impl<TSubstream, E> NetworkBehaviour for RPC<TSubstream, E>
impl<TSubstream> NetworkBehaviour for RPC<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
E: EthSpec,
{
type ProtocolsHandler = RPCHandler<TSubstream, E>;
type ProtocolsHandler = RPCHandler<TSubstream>;
type OutEvent = RPCMessage;
fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@ -8,7 +8,7 @@ use futures::{
future::{self, FutureResult},
sink, stream, Sink, Stream,
};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use std::io;
use std::time::Duration;
use tokio::codec::Framed;
@ -28,24 +28,22 @@ const REQUEST_TIMEOUT: u64 = 3;
pub struct RPCProtocol;
impl UpgradeInfo for RPCProtocol {
type Info = RawProtocolId;
type Info = ProtocolId;
type InfoIter = Vec<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
vec![
ProtocolId::new("hello", "1.0.0", "ssz").into(),
ProtocolId::new("goodbye", "1.0.0", "ssz").into(),
ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into(),
ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into(),
ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into(),
ProtocolId::new("hello", "1.0.0", "ssz"),
ProtocolId::new("goodbye", "1.0.0", "ssz"),
ProtocolId::new("beacon_block_roots", "1.0.0", "ssz"),
ProtocolId::new("beacon_block_headers", "1.0.0", "ssz"),
ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz"),
]
}
}
/// The raw protocol id sent over the wire.
type RawProtocolId = Vec<u8>;
/// Tracks the types in a protocol id.
#[derive(Clone)]
pub struct ProtocolId {
/// The rpc message type/name.
pub message_name: String,
@ -55,44 +53,31 @@ pub struct ProtocolId {
/// The encoding of the RPC.
pub encoding: String,
/// The protocol id that is formed from the above fields.
protocol_id: String,
}
/// An RPC protocol ID.
impl ProtocolId {
pub fn new(message_name: &str, version: &str, encoding: &str) -> Self {
let protocol_id = format!(
"{}/{}/{}/{}",
PROTOCOL_PREFIX, message_name, version, encoding
);
ProtocolId {
message_name: message_name.into(),
version: version.into(),
encoding: encoding.into(),
protocol_id,
}
}
/// Converts a raw RPC protocol id string into an `RPCProtocolId`
pub fn from_bytes(bytes: &[u8]) -> Result<Self, RPCError> {
let protocol_string = String::from_utf8(bytes.to_vec())
.map_err(|_| RPCError::InvalidProtocol("Invalid protocol Id"))?;
let protocol_list: Vec<&str> = protocol_string.as_str().split('/').take(7).collect();
if protocol_list.len() != 7 {
return Err(RPCError::InvalidProtocol("Not enough '/'"));
}
Ok(ProtocolId {
message_name: protocol_list[4].into(),
version: protocol_list[5].into(),
encoding: protocol_list[6].into(),
})
}
}
impl Into<RawProtocolId> for ProtocolId {
fn into(self) -> RawProtocolId {
format!(
"{}/{}/{}/{}",
PROTOCOL_PREFIX, self.message_name, self.version, self.encoding
)
.as_bytes()
.to_vec()
impl ProtocolName for ProtocolId {
fn protocol_name(&self) -> &[u8] {
self.protocol_id.as_bytes()
}
}
@ -127,16 +112,11 @@ where
fn upgrade_inbound(
self,
socket: upgrade::Negotiated<TSocket>,
protocol: RawProtocolId,
protocol: ProtocolId,
) -> Self::Future {
// TODO: Verify this
let protocol_id =
ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols");
match protocol_id.encoding.as_str() {
match protocol.encoding.as_str() {
"ssz" | _ => {
let ssz_codec =
BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, MAX_RPC_SIZE));
let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE));
let codec = InboundCodec::SSZ(ssz_codec);
Framed::new(socket, codec)
.into_future()
@ -171,7 +151,7 @@ pub enum RPCRequest {
}
impl UpgradeInfo for RPCRequest {
type Info = RawProtocolId;
type Info = ProtocolId;
type InfoIter = Vec<Self::Info>;
// add further protocols as we support more encodings/versions
@ -182,22 +162,25 @@ impl UpgradeInfo for RPCRequest {
/// Implements the encoding per supported protocol for RPCRequest.
impl RPCRequest {
pub fn supported_protocols(&self) -> Vec<RawProtocolId> {
pub fn supported_protocols(&self) -> Vec<ProtocolId> {
match self {
// add more protocols when versions/encodings are supported
RPCRequest::Hello(_) => vec![ProtocolId::new("hello", "1.0.0", "ssz").into()],
RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz").into()],
RPCRequest::Hello(_) => vec![
ProtocolId::new("hello", "1.0.0", "ssz"),
ProtocolId::new("goodbye", "1.0.0", "ssz"),
],
RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz")],
RPCRequest::BeaconBlockRoots(_) => {
vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into()]
vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz")]
}
RPCRequest::BeaconBlockHeaders(_) => {
vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into()]
vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz")]
}
RPCRequest::BeaconBlockBodies(_) => {
vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into()]
vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz")]
}
RPCRequest::BeaconChainState(_) => {
vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz").into()]
vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz")]
}
}
}
@ -230,12 +213,9 @@ where
socket: upgrade::Negotiated<TSocket>,
protocol: Self::Info,
) -> Self::Future {
let protocol_id =
ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols");
match protocol_id.encoding.as_str() {
match protocol.encoding.as_str() {
"ssz" | _ => {
let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol_id, 4096));
let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, 4096));
let codec = OutboundCodec::SSZ(ssz_codec);
Framed::new(socket, codec).send(self)
}

View File

@ -3,8 +3,8 @@ use crate::error;
use crate::multiaddr::Protocol;
use crate::rpc::RPCEvent;
use crate::NetworkConfig;
use crate::{TopicBuilder, TopicHash};
use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC};
use crate::{Topic, TopicHash};
use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC};
use futures::prelude::*;
use futures::Stream;
use libp2p::core::{
@ -21,25 +21,24 @@ use std::fs::File;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::time::Duration;
use types::EthSpec;
type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour<E> = Behaviour<Substream<StreamMuxerBox>, E>;
type Libp2pBehaviour = Behaviour<Substream<StreamMuxerBox>>;
const NETWORK_KEY_FILENAME: &str = "key";
/// The configuration and state of the libp2p components for the beacon node.
pub struct Service<E: EthSpec> {
pub struct Service {
/// The libp2p Swarm handler.
//TODO: Make this private
pub swarm: Swarm<Libp2pStream, Libp2pBehaviour<E>>,
pub swarm: Swarm<Libp2pStream, Libp2pBehaviour>,
/// This node's PeerId.
_local_peer_id: PeerId,
/// The libp2p logger handle.
pub log: slog::Logger,
}
impl<E: EthSpec> Service<E> {
impl Service {
pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
debug!(log, "Network-libp2p Service starting");
@ -76,18 +75,35 @@ impl<E: EthSpec> Service<E> {
),
};
// attempt to connect to user-input libp2p nodes
for multiaddr in config.libp2p_nodes {
match Swarm::dial_addr(&mut swarm, multiaddr.clone()) {
Ok(()) => debug!(log, "Dialing libp2p node: {}", multiaddr),
Err(err) => debug!(
log,
"Could not connect to node: {} error: {:?}", multiaddr, err
),
};
}
// subscribe to default gossipsub topics
let mut topics = vec![];
//TODO: Handle multiple shard attestations. For now we simply use a separate topic for
//attestations
topics.push(BEACON_ATTESTATION_TOPIC.to_string());
topics.push(BEACON_PUBSUB_TOPIC.to_string());
topics.append(&mut config.topics.clone());
// attestations
topics.push(Topic::new(BEACON_ATTESTATION_TOPIC.into()));
topics.push(Topic::new(BEACON_BLOCK_TOPIC.into()));
topics.append(
&mut config
.topics
.iter()
.cloned()
.map(|s| Topic::new(s))
.collect(),
);
let mut subscribed_topics = vec![];
for topic in topics {
let t = TopicBuilder::new(topic.clone()).build();
if swarm.subscribe(t) {
if swarm.subscribe(topic.clone()) {
trace!(log, "Subscribed to topic: {:?}", topic);
subscribed_topics.push(topic);
} else {
@ -104,8 +120,8 @@ impl<E: EthSpec> Service<E> {
}
}
impl<E: EthSpec> Stream for Service<E> {
type Item = Libp2pEvent<E>;
impl Stream for Service {
type Item = Libp2pEvent;
type Error = crate::error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -119,7 +135,7 @@ impl<E: EthSpec> Stream for Service<E> {
topics,
message,
} => {
trace!(self.log, "Pubsub message received: {:?}", message);
trace!(self.log, "Gossipsub message received"; "Message" => format!("{:?}", message));
return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage {
source,
topics,
@ -179,7 +195,7 @@ fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox)
}
/// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent<E: EthSpec> {
pub enum Libp2pEvent {
/// An RPC response request has been received on the swarm.
RPC(PeerId, RPCEvent),
/// Initiated the connection to a new peer.
@ -190,7 +206,7 @@ pub enum Libp2pEvent<E: EthSpec> {
PubsubMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: Box<PubsubMessage<E>>,
message: PubsubMessage,
},
}

View File

@ -64,7 +64,7 @@ fn handle_fork<T: BeaconChainTypes + 'static>(req: &mut Request) -> IronResult<R
let response = json!({
"fork": beacon_chain.head().beacon_state.fork,
"chain_id": beacon_chain.spec.chain_id
"network_id": beacon_chain.spec.network_id
});
Ok(Response::with((Status::Ok, response.to_string())))

View File

@ -76,7 +76,7 @@ pub fn create_iron_http_server<T: BeaconChainTypes + 'static>(
pub fn start_service<T: BeaconChainTypes + 'static>(
config: &HttpServerConfig,
executor: &TaskExecutor,
_network_chan: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
_network_chan: mpsc::UnboundedSender<NetworkMessage>,
beacon_chain: Arc<BeaconChain<T>>,
db_path: PathBuf,
metrics_registry: Registry,

View File

@ -14,7 +14,7 @@ use slog::{debug, warn};
use ssz::{Decode, DecodeError};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{BeaconBlockHeader, EthSpec};
use types::{Attestation, BeaconBlock, BeaconBlockHeader};
/// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler<T: BeaconChainTypes> {
@ -23,14 +23,14 @@ pub struct MessageHandler<T: BeaconChainTypes> {
/// The syncing framework.
sync: SimpleSync<T>,
/// The context required to send messages to, and process messages from peers.
network_context: NetworkContext<T::EthSpec>,
network_context: NetworkContext,
/// The `MessageHandler` logger.
log: slog::Logger,
}
/// Types of messages the handler can receive.
#[derive(Debug)]
pub enum HandlerMessage<E: EthSpec> {
pub enum HandlerMessage {
/// We have initiated a connection to a new peer.
PeerDialed(PeerId),
/// Peer has disconnected,
@ -38,17 +38,17 @@ pub enum HandlerMessage<E: EthSpec> {
/// An RPC response/request has been received.
RPC(PeerId, RPCEvent),
/// A gossip message has been received.
PubsubMessage(PeerId, Box<PubsubMessage<E>>),
PubsubMessage(PeerId, PubsubMessage),
}
impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
/// Initializes and runs the MessageHandler.
pub fn spawn(
beacon_chain: Arc<BeaconChain<T>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
executor: &tokio::runtime::TaskExecutor,
log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<HandlerMessage<T::EthSpec>>> {
) -> error::Result<mpsc::UnboundedSender<HandlerMessage>> {
debug!(log, "Service starting");
let (handler_send, handler_recv) = mpsc::unbounded_channel();
@ -78,7 +78,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
/// Handle all messages incoming from the network service.
fn handle_message(&mut self, message: HandlerMessage<T::EthSpec>) {
fn handle_message(&mut self, message: HandlerMessage) {
match message {
// we have initiated a connection to a peer
HandlerMessage::PeerDialed(peer_id) => {
@ -94,7 +94,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
// we have received an RPC message request/response
HandlerMessage::PubsubMessage(peer_id, gossip) => {
self.handle_gossip(peer_id, *gossip);
self.handle_gossip(peer_id, gossip);
}
}
}
@ -218,6 +218,62 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
}
/// Handle various RPC errors
fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
//TODO: Handle error correctly
warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error));
}
/// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) {
match gossip_message {
PubsubMessage::Block(message) => match self.decode_gossip_block(message) {
Err(e) => {
debug!(self.log, "Invalid Gossiped Beacon Block"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e));
}
Ok(block) => {
let _should_forward_on =
self.sync
.on_block_gossip(peer_id, block, &mut self.network_context);
}
},
PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) {
Err(e) => {
debug!(self.log, "Invalid Gossiped Attestation"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e));
}
Ok(attestation) => {
self.sync
.on_attestation_gossip(peer_id, attestation, &mut self.network_context)
}
},
PubsubMessage::Unknown(message) => {
// Received a message from an unknown topic. Ignore for now
debug!(self.log, "Unknown Gossip Message"; "Peer" => format!("{}", peer_id), "Message" => format!("{:?}", message));
}
}
}
/* Decoding of blocks and attestations from the network.
*
* TODO: Apply efficient decoding/verification of these objects
*/
fn decode_gossip_block(
&self,
beacon_block: Vec<u8>,
) -> Result<BeaconBlock<T::EthSpec>, DecodeError> {
//TODO: Apply verification before decoding.
BeaconBlock::from_ssz_bytes(&beacon_block)
}
fn decode_gossip_attestation(
&self,
beacon_block: Vec<u8>,
) -> Result<Attestation<T::EthSpec>, DecodeError> {
//TODO: Apply verification before decoding.
Attestation::from_ssz_bytes(&beacon_block)
}
/// Verifies and decodes the ssz-encoded block bodies received from peers.
fn decode_block_bodies(
&self,
@ -241,39 +297,18 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
//TODO: Implement faster header verification before decoding entirely
Vec::from_ssz_bytes(&headers_response.headers)
}
/// Handle various RPC errors
fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
//TODO: Handle error correctly
warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error));
}
/// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage<T::EthSpec>) {
match gossip_message {
PubsubMessage::Block(message) => {
let _should_forward_on =
self.sync
.on_block_gossip(peer_id, message, &mut self.network_context);
}
PubsubMessage::Attestation(message) => {
self.sync
.on_attestation_gossip(peer_id, message, &mut self.network_context)
}
}
}
}
// TODO: RPC Rewrite makes this struct fairly pointless
pub struct NetworkContext<E: EthSpec> {
pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage<E>>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
impl<E: EthSpec> NetworkContext<E> {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage<E>>, log: slog::Logger) -> Self {
impl NetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
Self { network_send, log }
}

View File

@ -14,13 +14,12 @@ use slog::{debug, info, o, trace};
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use tokio::sync::{mpsc, oneshot};
use types::EthSpec;
/// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service<T: BeaconChainTypes> {
libp2p_service: Arc<Mutex<LibP2PService<T::EthSpec>>>,
libp2p_service: Arc<Mutex<LibP2PService>>,
_libp2p_exit: oneshot::Sender<()>,
_network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
_network_send: mpsc::UnboundedSender<NetworkMessage>,
_phantom: PhantomData<T>, //message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>
}
@ -31,9 +30,9 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
config: &NetworkConfig,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<(Arc<Self>, mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>)> {
) -> error::Result<(Arc<Self>, mpsc::UnboundedSender<NetworkMessage>)> {
// build the network channel
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage<_>>();
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
// launch message handler thread
let message_handler_log = log.new(o!("Service" => "MessageHandler"));
let message_handler_send = MessageHandler::spawn(
@ -65,15 +64,15 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
Ok((Arc::new(network_service), network_send))
}
pub fn libp2p_service(&self) -> Arc<Mutex<LibP2PService<T::EthSpec>>> {
pub fn libp2p_service(&self) -> Arc<Mutex<LibP2PService>> {
self.libp2p_service.clone()
}
}
fn spawn_service<E: EthSpec>(
libp2p_service: Arc<Mutex<LibP2PService<E>>>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage<E>>,
message_handler_send: mpsc::UnboundedSender<HandlerMessage<E>>,
fn spawn_service(
libp2p_service: Arc<Mutex<LibP2PService>>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<tokio::sync::oneshot::Sender<()>> {
@ -99,10 +98,10 @@ fn spawn_service<E: EthSpec>(
}
//TODO: Potentially handle channel errors
fn network_service<E: EthSpec>(
libp2p_service: Arc<Mutex<LibP2PService<E>>>,
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage<E>>,
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage<E>>,
fn network_service(
libp2p_service: Arc<Mutex<LibP2PService>>,
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
log: slog::Logger,
) -> impl futures::Future<Item = (), Error = eth2_libp2p::error::Error> {
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
@ -119,7 +118,7 @@ fn network_service<E: EthSpec>(
},
NetworkMessage::Publish { topics, message } => {
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
libp2p_service.lock().swarm.publish(topics, *message);
libp2p_service.lock().swarm.publish(topics, message);
}
},
Ok(Async::NotReady) => break,
@ -176,14 +175,14 @@ fn network_service<E: EthSpec>(
/// Types of messages that the network service can receive.
#[derive(Debug)]
pub enum NetworkMessage<E: EthSpec> {
pub enum NetworkMessage {
/// Send a message to libp2p service.
//TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage),
/// Publish a message to pubsub mechanism.
Publish {
topics: Vec<Topic>,
message: Box<PubsubMessage<E>>,
message: PubsubMessage,
},
}

View File

@ -123,7 +123,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// Handle the connection of a new peer.
///
/// Sends a `Hello` message to the peer.
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext<T::EthSpec>) {
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) {
info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id));
network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain)));
@ -137,7 +137,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId,
request_id: RequestId,
hello: HelloMessage,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id));
@ -156,7 +156,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
hello: HelloMessage,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id));
@ -171,7 +171,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
hello: HelloMessage,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain);
@ -278,7 +278,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockRootsRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
let state = &self.chain.head().beacon_state;
@ -325,7 +325,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
res: BeaconBlockRootsResponse,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@ -389,7 +389,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockHeadersRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
let state = &self.chain.head().beacon_state;
@ -441,7 +441,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
headers: Vec<BeaconBlockHeader>,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@ -473,7 +473,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
let block_bodies: Vec<BeaconBlockBody<_>> = req
.block_roots
@ -519,7 +519,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
res: DecodedBeaconBlockBodiesResponse<T::EthSpec>,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@ -558,7 +558,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
block: BeaconBlock<T::EthSpec>,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) -> bool {
if let Some(outcome) =
self.process_block(peer_id.clone(), block.clone(), network, &"gossip")
@ -628,7 +628,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
_peer_id: PeerId,
msg: Attestation<T::EthSpec>,
_network: &mut NetworkContext<T::EthSpec>,
_network: &mut NetworkContext,
) {
match self.chain.process_attestation(msg) {
Ok(()) => info!(self.log, "ImportedAttestation"; "source" => "gossip"),
@ -643,7 +643,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
req: BeaconBlockRootsRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
// Potentially set state to sync.
if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE {
@ -667,7 +667,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
req: BeaconBlockHeadersRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@ -684,7 +684,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@ -720,7 +720,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
block_root: Hash256,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
source: &str,
) -> Option<BlockProcessingOutcome> {
match self.import_queue.attempt_complete_block(block_root) {
@ -813,7 +813,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
block: BeaconBlock<T::EthSpec>,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
source: &str,
) -> Option<BlockProcessingOutcome> {
let processing_result = self.chain.process_block(block.clone());
@ -915,9 +915,9 @@ fn hello_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> HelloMes
let state = &beacon_chain.head().beacon_state;
HelloMessage {
//TODO: Correctly define the chain/network id
network_id: spec.chain_id,
chain_id: u64::from(spec.chain_id),
network_id: spec.network_id,
//TODO: Correctly define the chain id
chain_id: spec.network_id as u64,
latest_finalized_root: state.finalized_checkpoint.root,
latest_finalized_epoch: state.finalized_checkpoint.epoch,
best_root: beacon_chain.head().beacon_block_root,

View File

@ -1,6 +1,6 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PubsubMessage;
use eth2_libp2p::TopicBuilder;
use eth2_libp2p::Topic;
use eth2_libp2p::BEACON_ATTESTATION_TOPIC;
use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
@ -11,7 +11,7 @@ use protos::services::{
};
use protos::services_grpc::AttestationService;
use slog::{error, info, trace, warn};
use ssz::{ssz_encode, Decode};
use ssz::{ssz_encode, Decode, Encode};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::Attestation;
@ -19,7 +19,7 @@ use types::Attestation;
#[derive(Clone)]
pub struct AttestationServiceInstance<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
pub network_chan: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
pub network_chan: mpsc::UnboundedSender<NetworkMessage>,
pub log: slog::Logger,
}
@ -144,13 +144,13 @@ impl<T: BeaconChainTypes> AttestationService for AttestationServiceInstance<T> {
);
// valid attestation, propagate to the network
let topic = TopicBuilder::new(BEACON_ATTESTATION_TOPIC).build();
let message = PubsubMessage::Attestation(attestation);
let topic = Topic::new(BEACON_ATTESTATION_TOPIC.into());
let message = PubsubMessage::Attestation(attestation.as_ssz_bytes());
self.network_chan
.try_send(NetworkMessage::Publish {
topics: vec![topic],
message: Box::new(message),
message: message,
})
.unwrap_or_else(|e| {
error!(

View File

@ -1,6 +1,6 @@
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::BEACON_PUBSUB_TOPIC;
use eth2_libp2p::{PubsubMessage, TopicBuilder};
use eth2_libp2p::BEACON_BLOCK_TOPIC;
use eth2_libp2p::{PubsubMessage, Topic};
use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use network::NetworkMessage;
@ -11,7 +11,7 @@ use protos::services::{
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
use slog::{error, info, trace, warn};
use ssz::{ssz_encode, Decode};
use ssz::{ssz_encode, Decode, Encode};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{BeaconBlock, Signature, Slot};
@ -19,7 +19,7 @@ use types::{BeaconBlock, Signature, Slot};
#[derive(Clone)]
pub struct BeaconBlockServiceInstance<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
pub network_chan: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
pub network_chan: mpsc::UnboundedSender<NetworkMessage>,
pub log: Logger,
}
@ -106,14 +106,14 @@ impl<T: BeaconChainTypes> BeaconBlockService for BeaconBlockServiceInstance<T> {
);
// get the network topic to send on
let topic = TopicBuilder::new(BEACON_PUBSUB_TOPIC).build();
let message = PubsubMessage::Block(block);
let topic = Topic::new(BEACON_BLOCK_TOPIC.into());
let message = PubsubMessage::Block(block.as_ssz_bytes());
// Publish the block to the p2p network via gossipsub.
self.network_chan
.try_send(NetworkMessage::Publish {
topics: vec![topic],
message: Box::new(message),
message: message,
})
.unwrap_or_else(|e| {
error!(

View File

@ -37,7 +37,7 @@ impl<T: BeaconChainTypes> BeaconNodeService for BeaconNodeServiceInstance<T> {
node_info.set_fork(fork);
node_info.set_genesis_time(genesis_time);
node_info.set_genesis_slot(spec.genesis_slot.as_u64());
node_info.set_chain_id(u32::from(spec.chain_id));
node_info.set_network_id(u32::from(spec.network_id));
// send the node_info the requester
let error_log = self.log.clone();

View File

@ -25,7 +25,7 @@ use tokio::sync::mpsc;
pub fn start_server<T: BeaconChainTypes + Clone + 'static>(
config: &RPCConfig,
executor: &TaskExecutor,
network_chan: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_chan: mpsc::UnboundedSender<NetworkMessage>,
beacon_chain: Arc<BeaconChain<T>>,
log: &slog::Logger,
) -> exit_future::Signal {

View File

@ -4,7 +4,7 @@ use clap::{App, Arg};
use client::{ClientConfig, Eth2Config};
use env_logger::{Builder, Env};
use eth2_config::{read_from_file, write_to_file};
use slog::{crit, o, Drain, Level};
use slog::{crit, o, warn, Drain, Level};
use std::fs;
use std::path::PathBuf;
@ -52,10 +52,17 @@ fn main() {
.arg(
Arg::with_name("listen-address")
.long("listen-address")
.value_name("Address")
.value_name("ADDRESS")
.help("The address lighthouse will listen for UDP and TCP connections. (default 127.0.0.1).")
.takes_value(true),
)
.arg(
Arg::with_name("port")
.long("port")
.value_name("PORT")
.help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.")
.takes_value(true),
)
.arg(
Arg::with_name("maxpeers")
.long("maxpeers")
@ -70,27 +77,34 @@ fn main() {
.help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.")
.takes_value(true),
)
.arg(
Arg::with_name("port")
.long("port")
.value_name("Lighthouse Port")
.help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.")
.takes_value(true),
)
.arg(
Arg::with_name("discovery-port")
.long("disc-port")
.value_name("DiscoveryPort")
.value_name("PORT")
.help("The discovery UDP port.")
.takes_value(true),
)
.arg(
Arg::with_name("discovery-address")
.long("discovery-address")
.value_name("Address")
.value_name("ADDRESS")
.help("The IP address to broadcast to other peers on how to reach this node.")
.takes_value(true),
)
.arg(
Arg::with_name("topics")
.long("topics")
.value_name("STRING")
.help("One or more comma-delimited gossipsub topic strings to subscribe to.")
.takes_value(true),
)
.arg(
Arg::with_name("libp2p-addresses")
.long("libp2p-addresses")
.value_name("MULTIADDR")
.help("One or more comma-delimited multiaddrs to manually connect to a libp2p peer without an ENR.")
.takes_value(true),
)
/*
* gRPC parameters.
*/
@ -136,6 +150,7 @@ fn main() {
.help("Listen port for the HTTP server.")
.takes_value(true),
)
/* Client related arguments */
.arg(
Arg::with_name("api")
.long("api")
@ -178,12 +193,9 @@ fn main() {
.long("default-spec")
.value_name("TITLE")
.short("default-spec")
.help("Specifies the default eth2 spec to be used. Overridden by any spec loaded
from disk. A spec will be written to disk after this flag is used, so it is
primarily used for creating eth2 spec files.")
.help("Specifies the default eth2 spec to be used. This will override any spec written to disk and will therefore be used by default in future instances.")
.takes_value(true)
.possible_values(&["mainnet", "minimal"])
.default_value("minimal"),
.possible_values(&["mainnet", "minimal", "interop"])
)
.arg(
Arg::with_name("recent-genesis")
@ -202,7 +214,7 @@ fn main() {
.help("The title of the spec constants for chain config.")
.takes_value(true)
.possible_values(&["info", "debug", "trace", "warn", "error", "crit"])
.default_value("info"),
.default_value("trace"),
)
.arg(
Arg::with_name("verbosity")
@ -301,29 +313,58 @@ fn main() {
let eth2_config_path = data_dir.join(ETH2_CONFIG_FILENAME);
// Attempt to load the `Eth2Config` from file.
// Initialise the `Eth2Config`.
//
// If the file doesn't exist, create a default one depending on the CLI flags.
let mut eth2_config = match read_from_file::<Eth2Config>(eth2_config_path.clone()) {
Ok(Some(c)) => c,
Ok(None) => {
let default = match matches.value_of("default-spec") {
Some("mainnet") => Eth2Config::mainnet(),
Some("minimal") => Eth2Config::minimal(),
_ => unreachable!(), // Guarded by slog.
};
if let Err(e) = write_to_file(eth2_config_path, &default) {
crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e));
return;
}
default
}
// If a CLI parameter is set, overwrite any config file present.
// If a parameter is not set, use either the config file present or default to minimal.
let cli_config = match matches.value_of("default-spec") {
Some("mainnet") => Some(Eth2Config::mainnet()),
Some("minimal") => Some(Eth2Config::minimal()),
Some("interop") => Some(Eth2Config::interop()),
_ => None,
};
// if a CLI flag is specified, write the new config if it doesn't exist,
// otherwise notify the user that the file will not be written.
let eth2_config_from_file = match read_from_file::<Eth2Config>(eth2_config_path.clone()) {
Ok(config) => config,
Err(e) => {
crit!(log, "Failed to load/generate an Eth2Config"; "error" => format!("{:?}", e));
crit!(log, "Failed to read the Eth2Config from file"; "error" => format!("{:?}", e));
return;
}
};
let mut eth2_config = {
if let Some(cli_config) = cli_config {
if eth2_config_from_file.is_none() {
// write to file if one doesn't exist
if let Err(e) = write_to_file(eth2_config_path, &cli_config) {
crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e));
return;
}
} else {
warn!(
log,
"Eth2Config file exists. Configuration file is ignored, using default"
);
}
cli_config
} else {
// CLI config not specified, read from disk
match eth2_config_from_file {
Some(config) => config,
None => {
// set default to minimal
let eth2_config = Eth2Config::minimal();
if let Err(e) = write_to_file(eth2_config_path, &eth2_config) {
crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e));
return;
}
eth2_config
}
}
}
};
// Update the eth2 config with any CLI flags.
match eth2_config.apply_cli_args(&matches) {
Ok(()) => (),
@ -333,6 +374,12 @@ fn main() {
}
};
// check to ensure the spec constants between the client and eth2_config match
if eth2_config.spec_constants != client_config.spec_constants {
crit!(log, "Specification constants do not match."; "client_config" => format!("{}", client_config.spec_constants), "eth2_config" => format!("{}", eth2_config.spec_constants));
return;
}
// Start the node using a `tokio` executor.
match run::run_beacon_node(client_config, eth2_config, &log) {
Ok(_) => {}

View File

@ -13,7 +13,7 @@ use tokio::runtime::Builder;
use tokio::runtime::Runtime;
use tokio::runtime::TaskExecutor;
use tokio_timer::clock::Clock;
use types::{MainnetEthSpec, MinimalEthSpec};
use types::{InteropEthSpec, MainnetEthSpec, MinimalEthSpec};
/// Reads the configuration and initializes a `BeaconChain` with the required types and parameters.
///
@ -90,6 +90,22 @@ pub fn run_beacon_node(
runtime,
log,
),
("disk", "interop") => run::<ClientType<DiskStore, InteropEthSpec>>(
&db_path,
client_config,
eth2_config,
executor,
runtime,
log,
),
("memory", "interop") => run::<ClientType<MemoryStore, InteropEthSpec>>(
&db_path,
client_config,
eth2_config,
executor,
runtime,
log,
),
(db_type, spec) => {
error!(log, "Unknown runtime configuration"; "spec_constants" => spec, "db_type" => db_type);
Err("Unknown specification and/or db_type.".into())

View File

@ -200,3 +200,37 @@ impl EthSpec for MinimalEthSpec {
}
pub type MinimalBeaconState = BeaconState<MinimalEthSpec>;
/// Interop testnet spec
#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
pub struct InteropEthSpec;
impl EthSpec for InteropEthSpec {
type ShardCount = U8;
type SlotsPerEpoch = U8;
type SlotsPerHistoricalRoot = U64;
type SlotsPerEth1VotingPeriod = U16;
type EpochsPerHistoricalVector = U64;
type EpochsPerSlashingsVector = U64;
type MaxPendingAttestations = U1024; // 128 max attestations * 8 slots per epoch
params_from_eth_spec!(MainnetEthSpec {
JustificationBitsLength,
MaxValidatorsPerCommittee,
GenesisEpoch,
HistoricalRootsLimit,
ValidatorRegistryLimit,
MaxProposerSlashings,
MaxAttesterSlashings,
MaxAttestations,
MaxDeposits,
MaxVoluntaryExits,
MaxTransfers
});
fn default_spec() -> ChainSpec {
ChainSpec::interop()
}
}
pub type InteropBeaconState = BeaconState<InteropEthSpec>;

View File

@ -92,7 +92,7 @@ pub struct ChainSpec {
domain_transfer: u32,
pub boot_nodes: Vec<String>,
pub chain_id: u8,
pub network_id: u8,
}
impl ChainSpec {
@ -190,7 +190,7 @@ impl ChainSpec {
* Network specific
*/
boot_nodes: vec![],
chain_id: 1, // mainnet chain id
network_id: 1, // mainnet network id
}
}
@ -208,7 +208,23 @@ impl ChainSpec {
shuffle_round_count: 10,
min_genesis_active_validator_count: 64,
max_epochs_per_crosslink: 4,
chain_id: 2, // lighthouse testnet chain id
network_id: 2, // lighthouse testnet network id
boot_nodes,
..ChainSpec::mainnet()
}
}
/// Interop testing spec
///
/// This allows us to customize a chain spec for interop testing.
pub fn interop() -> Self {
let boot_nodes = vec![];
Self {
seconds_per_slot: 12,
target_committee_size: 4,
shuffle_round_count: 10,
network_id: 13,
boot_nodes,
..ChainSpec::mainnet()
}

View File

@ -37,6 +37,13 @@ impl Eth2Config {
spec: ChainSpec::minimal(),
}
}
pub fn interop() -> Self {
Self {
spec_constants: "interop".to_string(),
spec: ChainSpec::interop(),
}
}
}
impl Eth2Config {

View File

@ -45,7 +45,7 @@ service AttestationService {
message NodeInfoResponse {
string version = 1;
Fork fork = 2;
uint32 chain_id = 3;
uint32 network_id = 3;
uint64 genesis_time = 4;
uint64 genesis_slot = 5;
}

View File

@ -1,47 +0,0 @@
spec_constants = "minimal"
[spec]
target_committee_size = 4
max_indices_per_attestation = 4096
min_per_epoch_churn_limit = 4
churn_limit_quotient = 65536
base_rewards_per_epoch = 5
shuffle_round_count = 10
deposit_contract_tree_depth = 32
min_deposit_amount = 1000000000
max_effective_balance = 32000000000
ejection_balance = 16000000000
effective_balance_increment = 1000000000
genesis_slot = 0
zero_hash = "0x0000000000000000000000000000000000000000000000000000000000000000"
bls_withdrawal_prefix_byte = "0x00"
genesis_time = 4294967295
seconds_per_slot = 6
min_attestation_inclusion_delay = 2
min_seed_lookahead = 1
activation_exit_delay = 4
slots_per_eth1_voting_period = 16
slots_per_historical_root = 8192
min_validator_withdrawability_delay = 256
persistent_committee_period = 2048
max_crosslink_epochs = 64
min_epochs_to_inactivity_penalty = 4
base_reward_quotient = 32
whistleblowing_reward_quotient = 512
proposer_reward_quotient = 8
inactivity_penalty_quotient = 33554432
min_slashing_penalty_quotient = 32
max_proposer_slashings = 16
max_attester_slashings = 1
max_attestations = 128
max_deposits = 16
max_voluntary_exits = 16
max_transfers = 0
domain_beacon_proposer = 0
domain_randao = 1
domain_attestation = 2
domain_deposit = 3
domain_voluntary_exit = 4
domain_transfer = 5
boot_nodes = ["/ip4/127.0.0.1/tcp/9000"]
chain_id = 2

View File

@ -11,10 +11,10 @@ use crate::service::Service as ValidatorService;
use clap::{App, Arg};
use eth2_config::{read_from_file, write_to_file, Eth2Config};
use protos::services_grpc::ValidatorServiceClient;
use slog::{crit, error, info, o, Drain, Level};
use slog::{crit, error, info, o, warn, Drain, Level};
use std::fs;
use std::path::PathBuf;
use types::{Keypair, MainnetEthSpec, MinimalEthSpec};
use types::{InteropEthSpec, Keypair, MainnetEthSpec, MinimalEthSpec};
pub const DEFAULT_SPEC: &str = "minimal";
pub const DEFAULT_DATA_DIR: &str = ".lighthouse-validator";
@ -64,14 +64,13 @@ fn main() {
.takes_value(true),
)
.arg(
Arg::with_name("spec-constants")
.long("spec-constants")
Arg::with_name("default-spec")
.long("default-spec")
.value_name("TITLE")
.short("s")
.help("The title of the spec constants for chain config.")
.short("default-spec")
.help("Specifies the default eth2 spec to be used. This will override any spec written to disk and will therefore be used by default in future instances.")
.takes_value(true)
.possible_values(&["mainnet", "minimal"])
.default_value("minimal"),
.possible_values(&["mainnet", "minimal", "interop"])
)
.arg(
Arg::with_name("debug-level")
@ -126,7 +125,7 @@ fn main() {
let client_config_path = data_dir.join(CLIENT_CONFIG_FILENAME);
// Attempt to lead the `ClientConfig` from disk.
// Attempt to load the `ClientConfig` from disk.
//
// If file doesn't exist, create a new, default one.
let mut client_config = match read_from_file::<ValidatorClientConfig>(
@ -164,29 +163,58 @@ fn main() {
.and_then(|s| Some(PathBuf::from(s)))
.unwrap_or_else(|| data_dir.join(ETH2_CONFIG_FILENAME));
// Attempt to load the `Eth2Config` from file.
// Initialise the `Eth2Config`.
//
// If the file doesn't exist, create a default one depending on the CLI flags.
let mut eth2_config = match read_from_file::<Eth2Config>(eth2_config_path.clone()) {
Ok(Some(c)) => c,
Ok(None) => {
let default = match matches.value_of("spec-constants") {
Some("mainnet") => Eth2Config::mainnet(),
Some("minimal") => Eth2Config::minimal(),
_ => unreachable!(), // Guarded by slog.
};
if let Err(e) = write_to_file(eth2_config_path, &default) {
crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e));
return;
}
default
}
// If a CLI parameter is set, overwrite any config file present.
// If a parameter is not set, use either the config file present or default to minimal.
let cli_config = match matches.value_of("default-spec") {
Some("mainnet") => Some(Eth2Config::mainnet()),
Some("minimal") => Some(Eth2Config::minimal()),
Some("interop") => Some(Eth2Config::interop()),
_ => None,
};
// if a CLI flag is specified, write the new config if it doesn't exist,
// otherwise notify the user that the file will not be written.
let eth2_config_from_file = match read_from_file::<Eth2Config>(eth2_config_path.clone()) {
Ok(config) => config,
Err(e) => {
crit!(log, "Failed to instantiate an Eth2Config"; "error" => format!("{:?}", e));
crit!(log, "Failed to read the Eth2Config from file"; "error" => format!("{:?}", e));
return;
}
};
let mut eth2_config = {
if let Some(cli_config) = cli_config {
if eth2_config_from_file.is_none() {
// write to file if one doesn't exist
if let Err(e) = write_to_file(eth2_config_path, &cli_config) {
crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e));
return;
}
} else {
warn!(
log,
"Eth2Config file exists. Configuration file is ignored, using default"
);
}
cli_config
} else {
// CLI config not specified, read from disk
match eth2_config_from_file {
Some(config) => config,
None => {
// set default to minimal
let eth2_config = Eth2Config::minimal();
if let Err(e) = write_to_file(eth2_config_path, &eth2_config) {
crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e));
return;
}
eth2_config
}
}
}
};
// Update the eth2 config with any CLI flags.
match eth2_config.apply_cli_args(&matches) {
Ok(()) => (),
@ -214,6 +242,11 @@ fn main() {
eth2_config,
log.clone(),
),
"interop" => ValidatorService::<ValidatorServiceClient, Keypair, InteropEthSpec>::start(
client_config,
eth2_config,
log.clone(),
),
other => {
crit!(log, "Unknown spec constants"; "title" => other);
return;

View File

@ -23,7 +23,7 @@ use protos::services_grpc::{
AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient,
ValidatorServiceClient,
};
use slog::{error, info, warn};
use slog::{crit, error, info, warn};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::marker::PhantomData;
use std::sync::Arc;
@ -37,7 +37,7 @@ use types::{ChainSpec, Epoch, EthSpec, Fork, Slot};
/// A fixed amount of time after a slot to perform operations. This gives the node time to complete
/// per-slot processes.
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(200);
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100);
/// The validator service. This is the main thread that executes and maintains validator
/// duties.
@ -106,13 +106,13 @@ impl<B: BeaconNodeDuties + 'static, S: Signer + 'static, E: EthSpec> Service<B,
);
return Err("Genesis time in the future".into());
}
// verify the node's chain id
if eth2_config.spec.chain_id != info.chain_id as u8 {
// verify the node's network id
if eth2_config.spec.network_id != info.network_id as u8 {
error!(
log,
"Beacon Node's genesis time is in the future. No work to do.\n Exiting"
);
return Err(format!("Beacon node has the wrong chain id. Expected chain id: {}, node's chain id: {}", eth2_config.spec.chain_id, info.chain_id).into());
return Err(format!("Beacon node has the wrong chain id. Expected chain id: {}, node's chain id: {}", eth2_config.spec.network_id, info.network_id).into());
}
break info;
}
@ -123,7 +123,7 @@ impl<B: BeaconNodeDuties + 'static, S: Signer + 'static, E: EthSpec> Service<B,
let genesis_time = node_info.get_genesis_time();
let genesis_slot = Slot::from(node_info.get_genesis_slot());
info!(log,"Beacon node connected"; "Node Version" => node_info.version.clone(), "Chain ID" => node_info.chain_id, "Genesis time" => genesis_time);
info!(log,"Beacon node connected"; "Node Version" => node_info.version.clone(), "Chain ID" => node_info.network_id, "Genesis time" => genesis_time);
let proto_fork = node_info.get_fork();
let mut previous_version: [u8; 4] = [0; 4];
@ -303,12 +303,16 @@ impl<B: BeaconNodeDuties + 'static, S: Signer + 'static, E: EthSpec> Service<B,
let current_epoch = current_slot.epoch(self.slots_per_epoch);
// this is a fatal error. If the slot clock repeats, there is something wrong with
// the timer, terminate immediately.
assert!(
current_slot > self.current_slot,
"The Timer should poll a new slot"
);
// this is a non-fatal error. If the slot clock repeats, the node could
// have been slow to process the previous slot and is now duplicating tasks.
// We ignore duplicated but raise a critical error.
if current_slot <= self.current_slot {
crit!(
self.log,
"The validator tried to duplicate a slot. Likely missed the previous slot"
);
return Err("Duplicate slot".into());
}
self.current_slot = current_slot;
info!(self.log, "Processing"; "slot" => current_slot.as_u64(), "epoch" => current_epoch.as_u64());
Ok(())