Merge branch 'master' into ci-rustfmt

This commit is contained in:
Luke Anderson 2019-03-26 17:04:06 +11:00
commit 72b7fb99a0
No known key found for this signature in database
GPG Key ID: 44408169EC61E228
26 changed files with 1290 additions and 471 deletions

View File

@ -29,4 +29,5 @@ members = [
"beacon_node/beacon_chain/test_harness",
"protos",
"validator_client",
"account_manager",
]

View File

@ -0,0 +1,13 @@
[package]
name = "account_manager"
version = "0.0.1"
authors = ["Luke Anderson <luke@sigmaprime.io>"]
edition = "2018"
[dependencies]
bls = { path = "../eth2/utils/bls" }
clap = "2.32.0"
slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
validator_client = { path = "../validator_client" }

24
account_manager/README.md Normal file
View File

@ -0,0 +1,24 @@
# Lighthouse Accounts Manager
The accounts manager (AM) is a stand-alone binary which allows
users to generate and manage the cryptographic keys necessary to
interact with Ethereum Serenity.
## Roles
The AM is responsible for the following tasks:
- Generation of cryptographic key pairs
- Must acquire sufficient entropy to ensure keys are generated securely (TBD)
- Secure storage of private keys
- Keys must be encrypted while at rest on the disk (TBD)
- The format is compatible with the validator client
- Produces messages and transactions necessary to initiate
staking on Ethereum 1.x (TPD)
## Implementation
The AM is not a service, and does not run continuously, nor does it
interact with any running services.
It is intended to be executed separately from other Lighthouse binaries
and produce files which can be consumed by them.

View File

@ -0,0 +1,58 @@
use bls::Keypair;
use clap::{App, Arg, SubCommand};
use slog::{debug, info, o, Drain};
use std::path::PathBuf;
use validator_client::Config as ValidatorClientConfig;
fn main() {
// Logging
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let log = slog::Logger::root(drain, o!());
// CLI
let matches = App::new("Lighthouse Accounts Manager")
.version("0.0.1")
.author("Sigma Prime <contact@sigmaprime.io>")
.about("Eth 2.0 Accounts Manager")
.arg(
Arg::with_name("datadir")
.long("datadir")
.value_name("DIR")
.help("Data directory for keys and databases.")
.takes_value(true),
)
.subcommand(
SubCommand::with_name("generate")
.about("Generates a new validator private key")
.version("0.0.1")
.author("Sigma Prime <contact@sigmaprime.io>"),
)
.get_matches();
let config = ValidatorClientConfig::parse_args(&matches, &log)
.expect("Unable to build a configuration for the account manager.");
// Log configuration
info!(log, "";
"data_dir" => &config.data_dir.to_str());
match matches.subcommand() {
("generate", Some(_gen_m)) => {
let keypair = Keypair::random();
let key_path: PathBuf = config
.save_key(&keypair)
.expect("Unable to save newly generated private key.");
debug!(
log,
"Keypair generated {:?}, saved to: {:?}",
keypair.identifier(),
key_path.to_string_lossy()
);
}
_ => panic!(
"The account manager must be run with a subcommand. See help for more information."
),
}
}

View File

@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
# SigP repository until PR is merged
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "987fec350b5c2eb55e5f0139615adab49ce493ff" }
types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" }
ssz_derive = { path = "../../eth2/utils/ssz_derive" }

View File

@ -1,11 +1,18 @@
use crate::rpc::{RPCEvent, RPCMessage, Rpc};
use crate::NetworkConfig;
use futures::prelude::*;
use libp2p::{
core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent},
core::{
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
PublicKey,
},
gossipsub::{Gossipsub, GossipsubEvent},
identify::{protocol::IdentifyInfo, Identify, IdentifyEvent},
ping::{Ping, PingEvent},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use slog::{debug, o};
use types::Topic;
/// Builds the network behaviour for the libp2p Swarm.
@ -13,12 +20,22 @@ use types::Topic;
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
/// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub<TSubstream>,
// TODO: Add Kademlia for peer discovery
/// The events generated by this behaviour to be consumed in the swarm poll.
serenity_rpc: Rpc<TSubstream>,
/// Allows discovery of IP addresses for peers on the network.
identify: Identify<TSubstream>,
/// Keep regular connection to peers and disconnect if absent.
// TODO: Keepalive, likely remove this later.
// TODO: Make the ping time customizeable.
ping: Ping<TSubstream>,
#[behaviour(ignore)]
events: Vec<BehaviourEvent>,
/// Logger for behaviour actions.
#[behaviour(ignore)]
log: slog::Logger,
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
@ -53,12 +70,54 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage
}
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Identified {
peer_id, mut info, ..
} => {
if info.listen_addrs.len() > 20 {
debug!(
self.log,
"More than 20 peers have been identified, truncating"
);
info.listen_addrs.truncate(20);
}
self.events.push(BehaviourEvent::Identified(peer_id, info));
}
IdentifyEvent::Error { .. } => {}
IdentifyEvent::SendBack { .. } => {}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, _event: PingEvent) {
// not interested in ping responses at the moment.
}
}
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self {
pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self {
let local_peer_id = local_public_key.clone().into_peer_id();
let identify_config = net_conf.identify_config.clone();
let behaviour_log = log.new(o!());
Behaviour {
gossipsub: Gossipsub::new(local_peer_id, gs_config),
gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()),
serenity_rpc: Rpc::new(log),
identify: Identify::new(
identify_config.version,
identify_config.user_agent,
local_public_key,
),
ping: Ping::new(),
events: Vec::new(),
log: behaviour_log,
}
}
@ -91,6 +150,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub enum BehaviourEvent {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
Identified(PeerId, IdentifyInfo),
// TODO: This is a stub at the moment
Message(String),
}

View File

@ -1,11 +1,9 @@
use crate::Multiaddr;
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder};
use libp2p::secio;
use std::fmt;
#[derive(Clone)]
#[derive(Clone, Debug)]
/// Network configuration for lighthouse.
pub struct NetworkConfig {
pub struct Config {
//TODO: stubbing networking initial params, change in the future
/// IP address to listen on.
pub listen_addresses: Vec<Multiaddr>,
@ -13,47 +11,58 @@ pub struct NetworkConfig {
pub listen_port: u16,
/// Gossipsub configuration parameters.
pub gs_config: GossipsubConfig,
/// Configuration parameters for node identification protocol.
pub identify_config: IdentifyConfig,
/// List of nodes to initially connect to.
pub boot_nodes: Vec<Multiaddr>,
/// Peer key related to this nodes PeerId.
pub local_private_key: secio::SecioKeyPair,
/// Client version
pub client_version: String,
/// List of topics to subscribe to as strings
pub topics: Vec<String>,
}
impl Default for NetworkConfig {
impl Default for Config {
/// Generate a default network configuration.
fn default() -> Self {
// TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this
// PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733
NetworkConfig {
Config {
listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000"
.parse()
.expect("is a correct multi-address")],
listen_port: 9000,
gs_config: GossipsubConfigBuilder::new().build(),
gs_config: GossipsubConfigBuilder::new()
.max_gossip_size(4_000_000)
.build(),
identify_config: IdentifyConfig::default(),
boot_nodes: Vec::new(),
local_private_key: secio::SecioKeyPair::secp256k1_generated().unwrap(),
client_version: version::version(),
topics: vec![String::from("beacon_chain")],
}
}
}
impl NetworkConfig {
impl Config {
pub fn new(boot_nodes: Vec<Multiaddr>) -> Self {
let mut conf = NetworkConfig::default();
let mut conf = Config::default();
conf.boot_nodes = boot_nodes;
conf
}
}
impl fmt::Debug for NetworkConfig {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NetworkConfig: listen_addresses: {:?}, listen_port: {:?}, gs_config: {:?}, boot_nodes: {:?}, local_private_key: <Secio-PubKey {:?}>, client_version: {:?}", self.listen_addresses, self.listen_port, self.gs_config, self.boot_nodes, self.local_private_key.to_public_key(), self.client_version)
/// The configuration parameters for the Identify protocol
#[derive(Debug, Clone)]
pub struct IdentifyConfig {
/// The protocol version to listen on.
pub version: String,
/// The client's name and version for identification.
pub user_agent: String,
}
impl Default for IdentifyConfig {
fn default() -> Self {
Self {
version: "/eth/serenity/1.0".to_string(),
user_agent: version::version(),
}
}
}

View File

@ -3,16 +3,16 @@
///
/// This crate builds and manages the libp2p services required by the beacon node.
pub mod behaviour;
mod config;
pub mod error;
mod network_config;
pub mod rpc;
mod service;
pub use config::Config as NetworkConfig;
pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId,
};
pub use network_config::NetworkConfig;
pub use rpc::{HelloMessage, RPCEvent};
pub use service::Libp2pEvent;
pub use service::Service;

View File

@ -1,4 +1,4 @@
use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
use super::methods::*;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::{ssz_encode, Decodable, Encodable, SszStream};
use std::io;
@ -6,7 +6,7 @@ use std::iter;
use tokio::io::{AsyncRead, AsyncWrite};
/// The maximum bytes that can be sent across the RPC.
const MAX_READ_SIZE: usize = 2048;
const MAX_READ_SIZE: usize = 4_194_304; // 4M
/// Implementation of the `ConnectionUpgrade` for the rpc protocol.
@ -60,10 +60,13 @@ where
{
type Output = RPCEvent;
type Error = DecodeError;
type Future =
upgrade::ReadOneThen<TSocket, (), fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>>;
type Future = upgrade::ReadOneThen<
upgrade::Negotiated<TSocket>,
(),
fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>,
>;
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future {
upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?))
}
}
@ -81,7 +84,31 @@ fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCRequest::Hello(hello_body)
}
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
RPCMethod::Goodbye => {
let (goodbye_code, _index) = u64::ssz_decode(&packet, index)?;
RPCRequest::Goodbye(goodbye_code)
}
RPCMethod::BeaconBlockRoots => {
let (block_roots_request, _index) =
BeaconBlockRootsRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockRoots(block_roots_request)
}
RPCMethod::BeaconBlockHeaders => {
let (block_headers_request, _index) =
BeaconBlockHeadersRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockHeaders(block_headers_request)
}
RPCMethod::BeaconBlockBodies => {
let (block_bodies_request, _index) =
BeaconBlockBodiesRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockBodies(block_bodies_request)
}
RPCMethod::BeaconChainState => {
let (chain_state_request, _index) =
BeaconChainStateRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconChainState(chain_state_request)
}
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
};
Ok(RPCEvent::Request {
@ -97,7 +124,24 @@ fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
let (body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCResponse::Hello(body)
}
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
RPCMethod::Goodbye => unreachable!("Should never receive a goodbye response"),
RPCMethod::BeaconBlockRoots => {
let (body, _index) = BeaconBlockRootsResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockRoots(body)
}
RPCMethod::BeaconBlockHeaders => {
let (body, _index) = BeaconBlockHeadersResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockHeaders(body)
}
RPCMethod::BeaconBlockBodies => {
let (body, _index) = BeaconBlockBodiesResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockBodies(body)
}
RPCMethod::BeaconChainState => {
let (body, _index) = BeaconChainStateResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconChainState(body)
}
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
};
Ok(RPCEvent::Response {
id,
@ -113,10 +157,10 @@ where
{
type Output = ();
type Error = io::Error;
type Future = upgrade::WriteOne<TSocket>;
type Future = upgrade::WriteOne<upgrade::Negotiated<TSocket>>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future {
let bytes = ssz_encode(&self);
upgrade::write_one(socket, bytes)
}
@ -137,7 +181,21 @@ impl Encodable for RPCEvent {
RPCRequest::Hello(body) => {
s.append(body);
}
_ => {}
RPCRequest::Goodbye(body) => {
s.append(body);
}
RPCRequest::BeaconBlockRoots(body) => {
s.append(body);
}
RPCRequest::BeaconBlockHeaders(body) => {
s.append(body);
}
RPCRequest::BeaconBlockBodies(body) => {
s.append(body);
}
RPCRequest::BeaconChainState(body) => {
s.append(body);
}
}
}
RPCEvent::Response {
@ -152,7 +210,18 @@ impl Encodable for RPCEvent {
RPCResponse::Hello(response) => {
s.append(response);
}
_ => {}
RPCResponse::BeaconBlockRoots(response) => {
s.append(response);
}
RPCResponse::BeaconBlockHeaders(response) => {
s.append(response);
}
RPCResponse::BeaconBlockBodies(response) => {
s.append(response);
}
RPCResponse::BeaconChainState(response) => {
s.append(response);
}
}
}
}

View File

@ -6,13 +6,14 @@ use crate::NetworkConfig;
use futures::prelude::*;
use futures::Stream;
use libp2p::core::{
identity,
muxing::StreamMuxerBox,
nodes::Substream,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
};
use libp2p::{core, secio, Transport};
use libp2p::{PeerId, Swarm};
use libp2p::identify::protocol::IdentifyInfo;
use libp2p::{core, secio, PeerId, Swarm, Transport};
use slog::{debug, info, trace, warn};
use std::io::{Error, ErrorKind};
use std::time::Duration;
@ -33,15 +34,20 @@ impl Service {
pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
debug!(log, "Libp2p Service starting");
let local_private_key = config.local_private_key;
let local_peer_id = local_private_key.to_peer_id();
// TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this
// PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733
// TODO: Save and recover node key from disk
let local_private_key = identity::Keypair::generate_secp256k1();
let local_public_key = local_private_key.public();
let local_peer_id = PeerId::from(local_private_key.public());
info!(log, "Local peer id: {:?}", local_peer_id);
let mut swarm = {
// Set up the transport
let transport = build_transport(local_private_key);
// Set up gossipsub routing
let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log);
let behaviour = Behaviour::new(local_public_key.clone(), &config, &log);
// Set up Topology
let topology = local_peer_id.clone();
Swarm::new(transport, behaviour, topology)
@ -99,17 +105,23 @@ impl Stream for Service {
// TODO: Currently only gossipsub events passed here.
// Build a type for more generic events
match self.swarm.poll() {
Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => {
//Behaviour events
Ok(Async::Ready(Some(event))) => match event {
// TODO: Stub here for debugging
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
}
Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
}
Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
}
BehaviourEvent::Message(m) => {
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
}
BehaviourEvent::RPC(peer_id, event) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
}
BehaviourEvent::PeerDialed(peer_id) => {
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
}
BehaviourEvent::Identified(peer_id, info) => {
return Ok(Async::Ready(Some(Libp2pEvent::Identified(peer_id, info))));
}
},
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Async::NotReady) => break,
_ => break,
@ -121,9 +133,7 @@ impl Stream for Service {
/// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and
/// mplex or yamux as the multiplexing layer.
fn build_transport(
local_private_key: secio::SecioKeyPair,
) -> Boxed<(PeerId, StreamMuxerBox), Error> {
fn build_transport(local_private_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> {
// TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised
// in the future.
let transport = libp2p::tcp::TcpConfig::new();
@ -156,8 +166,12 @@ fn build_transport(
/// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent {
// We have received an RPC event on the swarm
/// An RPC response request has been received on the swarm.
RPC(PeerId, RPCEvent),
/// Initiated the connection to a new peer.
PeerDialed(PeerId),
/// Received information about a peer on the network.
Identified(PeerId, IdentifyInfo),
// TODO: Pub-sub testing only.
Message(String),
}

View File

@ -15,8 +15,8 @@ use tokio::runtime::TaskExecutor;
/// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service {
//eth2_libp2p_service: Arc<Mutex<LibP2PService>>,
eth2_libp2p_exit: oneshot::Sender<()>,
//libp2p_service: Arc<Mutex<LibP2PService>>,
libp2p_exit: oneshot::Sender<()>,
network_send: crossbeam_channel::Sender<NetworkMessage>,
//message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>,
@ -40,20 +40,20 @@ impl Service {
message_handler_log,
)?;
// launch eth2_libp2p service
let eth2_libp2p_log = log.new(o!("Service" => "Libp2p"));
let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?;
// launch libp2p service
let libp2p_log = log.new(o!("Service" => "Libp2p"));
let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?;
// TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread.
let eth2_libp2p_exit = spawn_service(
eth2_libp2p_service,
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
let libp2p_exit = spawn_service(
libp2p_service,
network_recv,
message_handler_send,
executor,
log,
)?;
let network_service = Service {
eth2_libp2p_exit,
libp2p_exit,
network_send: network_send.clone(),
};
@ -72,7 +72,7 @@ impl Service {
}
fn spawn_service(
eth2_libp2p_service: LibP2PService,
libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
executor: &TaskExecutor,
@ -83,7 +83,7 @@ fn spawn_service(
// spawn on the current executor
executor.spawn(
network_service(
eth2_libp2p_service,
libp2p_service,
network_recv,
message_handler_send,
log.clone(),
@ -100,7 +100,7 @@ fn spawn_service(
}
fn network_service(
mut eth2_libp2p_service: LibP2PService,
mut libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
log: slog::Logger,
@ -108,28 +108,34 @@ fn network_service(
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
// poll the swarm
loop {
match eth2_libp2p_service.poll() {
Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => {
trace!(
eth2_libp2p_service.log,
"RPC Event: RPC message received: {:?}",
rpc_event
);
message_handler_send
.send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "failed to send rpc to handler")?;
}
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {
debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id);
message_handler_send
.send(HandlerMessage::PeerDialed(peer_id))
.map_err(|_| "failed to send rpc to handler")?;
}
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!(
eth2_libp2p_service.log,
"Network Service: Message received: {}", m
),
_ => break,
match libp2p_service.poll() {
Ok(Async::Ready(Some(event))) => match event {
Libp2pEvent::RPC(peer_id, rpc_event) => {
trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
message_handler_send
.send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "failed to send rpc to handler")?;
}
Libp2pEvent::PeerDialed(peer_id) => {
debug!(log, "Peer Dialed: {:?}", peer_id);
message_handler_send
.send(HandlerMessage::PeerDialed(peer_id))
.map_err(|_| "failed to send rpc to handler")?;
}
Libp2pEvent::Identified(peer_id, info) => {
debug!(
log,
"We have identified peer: {:?} with {:?}", peer_id, info
);
}
Libp2pEvent::Message(m) => debug!(
libp2p_service.log,
"Network Service: Message received: {}", m
),
},
Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
Ok(Async::NotReady) => break,
Err(_) => break,
}
}
// poll the network channel
@ -143,7 +149,7 @@ fn network_service(
trace!(log, "Sending RPC Event: {:?}", rpc_event);
//TODO: Make swarm private
//TODO: Implement correct peer id topic message handling
eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event);
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
debug!(log, "Received message from notifier");
@ -165,7 +171,7 @@ fn network_service(
/// Types of messages that the network service can receive.
#[derive(Debug, Clone)]
pub enum NetworkMessage {
/// Send a message to eth2_libp2p service.
/// Send a message to libp2p service.
//TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage),
}

File diff suppressed because it is too large Load Diff

View File

@ -15,12 +15,10 @@ pub struct TestDoc {
pub title: String,
pub summary: String,
pub fork: String,
pub version: String,
pub test_cases: Vec<TestCase>,
}
#[test]
#[ignore]
fn yaml() {
use serde_yaml;
use std::{fs::File, io::prelude::*, path::PathBuf};

View File

@ -10,6 +10,7 @@ boolean-bitfield = { path = "../utils/boolean-bitfield" }
dirs = "1.0"
ethereum-types = "0.5"
hashing = { path = "../utils/hashing" }
hex = "0.3"
honey-badger-split = { path = "../utils/honey-badger-split" }
int_to_bytes = { path = "../utils/int_to_bytes" }
log = "0.4"
@ -24,7 +25,7 @@ ssz = { path = "../utils/ssz" }
ssz_derive = { path = "../utils/ssz_derive" }
swap_or_not_shuffle = { path = "../utils/swap_or_not_shuffle" }
test_random_derive = { path = "../utils/test_random_derive" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "987fec350b5c2eb55e5f0139615adab49ce493ff" }
[dev-dependencies]
env_logger = "0.6.0"

View File

@ -2,6 +2,7 @@ use crate::*;
use bls::Signature;
use int_to_bytes::int_to_bytes4;
use serde_derive::Deserialize;
use test_utils::u8_from_hex_str;
const GWEI: u64 = 1_000_000_000;
@ -57,6 +58,7 @@ pub struct ChainSpec {
pub far_future_epoch: Epoch,
pub zero_hash: Hash256,
pub empty_signature: Signature,
#[serde(deserialize_with = "u8_from_hex_str")]
pub bls_withdrawal_prefix_byte: u8,
/*

View File

@ -1,4 +1,7 @@
use crate::{test_utils::TestRandom, ChainSpec, Epoch};
use crate::{
test_utils::{fork_from_hex_str, TestRandom},
ChainSpec, Epoch,
};
use int_to_bytes::int_to_bytes4;
use rand::RngCore;
use serde_derive::{Deserialize, Serialize};
@ -12,7 +15,9 @@ use test_random_derive::TestRandom;
Debug, Clone, PartialEq, Default, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom,
)]
pub struct Fork {
#[serde(deserialize_with = "fork_from_hex_str")]
pub previous_version: [u8; 4],
#[serde(deserialize_with = "fork_from_hex_str")]
pub current_version: [u8; 4],
pub epoch: Epoch,
}

View File

@ -2,6 +2,7 @@
mod macros;
mod generate_deterministic_keypairs;
mod keypairs_file;
mod serde_utils;
mod test_random;
mod testing_attestation_builder;
mod testing_attestation_data_builder;
@ -17,6 +18,7 @@ mod testing_voluntary_exit_builder;
pub use generate_deterministic_keypairs::generate_deterministic_keypairs;
pub use keypairs_file::KeypairsFile;
pub use rand::{prng::XorShiftRng, SeedableRng};
pub use serde_utils::{fork_from_hex_str, u8_from_hex_str};
pub use test_random::TestRandom;
pub use testing_attestation_builder::TestingAttestationBuilder;
pub use testing_attestation_data_builder::TestingAttestationDataBuilder;

View File

@ -0,0 +1,28 @@
use serde::de::Error;
use serde::{Deserialize, Deserializer};
pub fn u8_from_hex_str<'de, D>(deserializer: D) -> Result<u8, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
u8::from_str_radix(&s.as_str()[2..], 16).map_err(D::Error::custom)
}
pub fn fork_from_hex_str<'de, D>(deserializer: D) -> Result<[u8; 4], D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
let mut array = [0 as u8; 4];
let decoded: Vec<u8> = hex::decode(&s.as_str()[2..]).map_err(D::Error::custom)?;
for (i, item) in array.iter_mut().enumerate() {
if i > decoded.len() {
break;
}
*item = decoded[i];
}
Ok(array)
}

View File

@ -14,4 +14,8 @@ impl Keypair {
let pk = PublicKey::from_secret_key(&sk);
Keypair { sk, pk }
}
pub fn identifier(&self) -> String {
self.pk.concatenated_hex_id()
}
}

View File

@ -8,13 +8,14 @@ impl<'de> Visitor<'de> for HexVisitor {
type Value = Vec<u8>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a hex string (without 0x prefix)")
formatter.write_str("a hex string (irrelevant of prefix)")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(hex::decode(value).map_err(|e| de::Error::custom(format!("invalid hex ({:?})", e)))?)
Ok(hex::decode(value.trim_start_matches("0x"))
.map_err(|e| de::Error::custom(format!("invalid hex ({:?})", e)))?)
}
}

View File

@ -13,27 +13,35 @@ use ssz::{
/// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ
/// serialization).
#[derive(Debug, PartialEq, Clone, Eq)]
pub struct Signature(RawSignature);
pub struct Signature {
signature: RawSignature,
is_empty: bool,
}
impl Signature {
/// Instantiate a new Signature from a message and a SecretKey.
pub fn new(msg: &[u8], domain: u64, sk: &SecretKey) -> Self {
Signature(RawSignature::new(msg, domain, sk.as_raw()))
Signature {
signature: RawSignature::new(msg, domain, sk.as_raw()),
is_empty: false,
}
}
/// Instantiate a new Signature from a message and a SecretKey, where the message has already
/// been hashed.
pub fn new_hashed(x_real_hashed: &[u8], x_imaginary_hashed: &[u8], sk: &SecretKey) -> Self {
Signature(RawSignature::new_hashed(
x_real_hashed,
x_imaginary_hashed,
sk.as_raw(),
))
Signature {
signature: RawSignature::new_hashed(x_real_hashed, x_imaginary_hashed, sk.as_raw()),
is_empty: false,
}
}
/// Verify the Signature against a PublicKey.
pub fn verify(&self, msg: &[u8], domain: u64, pk: &PublicKey) -> bool {
self.0.verify(msg, domain, pk.as_raw())
if self.is_empty {
return false;
}
self.signature.verify(msg, domain, pk.as_raw())
}
/// Verify the Signature against a PublicKey, where the message has already been hashed.
@ -43,44 +51,72 @@ impl Signature {
x_imaginary_hashed: &[u8],
pk: &PublicKey,
) -> bool {
self.0
self.signature
.verify_hashed(x_real_hashed, x_imaginary_hashed, pk.as_raw())
}
/// Returns the underlying signature.
pub fn as_raw(&self) -> &RawSignature {
&self.0
&self.signature
}
/// Returns a new empty signature.
pub fn empty_signature() -> Self {
// Empty Signature is currently being represented as BLS::Signature.point_at_infinity()
// However it should be represented as vec![0; 96] but this
// would require all signatures to be represented in byte form as opposed to Signature
// Set RawSignature = infinity
let mut empty: Vec<u8> = vec![0; 96];
// Sets C_flag and B_flag to 1 and all else to 0
empty[0] += u8::pow(2, 6) + u8::pow(2, 7);
Signature(RawSignature::from_bytes(&empty).unwrap())
Signature {
signature: RawSignature::from_bytes(&empty).unwrap(),
is_empty: true,
}
}
// Converts a BLS Signature to bytes
pub fn as_bytes(&self) -> Vec<u8> {
if self.is_empty {
return vec![0; 96];
}
self.signature.as_bytes()
}
// Convert bytes to BLS Signature
pub fn from_bytes(bytes: &[u8]) -> Result<Self, DecodeError> {
for byte in bytes {
if *byte != 0 {
let raw_signature =
RawSignature::from_bytes(&bytes).map_err(|_| DecodeError::Invalid)?;
return Ok(Signature {
signature: raw_signature,
is_empty: false,
});
}
}
Ok(Signature::empty_signature())
}
// Check for empty Signature
pub fn is_empty(&self) -> bool {
self.is_empty
}
}
impl Encodable for Signature {
fn ssz_append(&self, s: &mut SszStream) {
s.append_vec(&self.0.as_bytes());
s.append_vec(&self.as_bytes());
}
}
impl Decodable for Signature {
fn ssz_decode(bytes: &[u8], i: usize) -> Result<(Self, usize), DecodeError> {
let (sig_bytes, i) = decode_ssz_list(bytes, i)?;
let raw_sig = RawSignature::from_bytes(&sig_bytes).map_err(|_| DecodeError::TooShort)?;
Ok((Signature(raw_sig), i))
let signature = Signature::from_bytes(&sig_bytes)?;
Ok((signature, i))
}
}
impl TreeHash for Signature {
fn hash_tree_root(&self) -> Vec<u8> {
hash(&self.0.as_bytes())
hash(&self.as_bytes())
}
}

View File

@ -4,6 +4,15 @@ version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[[bin]]
name = "validator_client"
path = "src/main.rs"
[lib]
name = "validator_client"
path = "src/lib.rs"
[dependencies]
block_proposer = { path = "../eth2/block_proposer" }
bls = { path = "../eth2/utils/bls" }
@ -18,3 +27,4 @@ slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
ssz = { path = "../eth2/utils/ssz" }
bincode = "^1.1.2"

View File

@ -57,10 +57,30 @@ complete and return a block from the BN.
### Configuration
Presently the validator specifics (pubkey, etc.) are randomly generated and the
chain specification (slot length, BLS domain, etc.) are fixed to foundation
parameters. This is temporary and will be upgrade so these parameters can be
read from file (or initialized on first-boot).
Validator configurations are stored in a separate data directory from the main Beacon Node
binary. The validator data directory defaults to:
`$HOME/.lighthouse-validator`, however an alternative can be specified on the command line
with `--datadir`.
The configuration directory structure looks like:
```
~/.lighthouse-validator
├── 3cf4210d58ec
│   └── private.key
├── 9b5d8b5be4e7
│   └── private.key
└── cf6e07188f48
└── private.key
```
Where the hex value of the directory is a portion of the validator public key.
Validator keys must be generated using the separate `accounts_manager` binary, which will
place the keys into this directory structure in a format compatible with the validator client.
The chain specification (slot length, BLS domain, etc.) defaults to foundation
parameters, however is temporary and an upgrade will allow these parameters to be
read from a file (or initialized on first-boot).
## BN Communication

View File

@ -1,28 +1,39 @@
use bincode;
use bls::Keypair;
use clap::ArgMatches;
use slog::{debug, error, info};
use std::fs;
use std::fs::File;
use std::io::{Error, ErrorKind};
use std::path::PathBuf;
use types::ChainSpec;
/// Stores the core configuration for this validator instance.
#[derive(Clone)]
pub struct ClientConfig {
pub struct Config {
/// The data directory, which stores all validator databases
pub data_dir: PathBuf,
/// The server at which the Beacon Node can be contacted
pub server: String,
/// The chain specification that we are connecting to
pub spec: ChainSpec,
}
const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse-validators";
const DEFAULT_PRIVATE_KEY_FILENAME: &str = "private.key";
impl ClientConfig {
/// Build a new configuration from defaults.
pub fn default() -> Self {
impl Default for Config {
fn default() -> Self {
let data_dir = {
let home = dirs::home_dir().expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR)
let home = dirs::home_dir().expect("Unable to determine home directory.");
home.join(".lighthouse-validator")
};
fs::create_dir_all(&data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let server = "localhost:50051".to_string();
let spec = ChainSpec::foundation();
Self {
data_dir,
server,
@ -30,3 +41,114 @@ impl ClientConfig {
}
}
}
impl Config {
/// Build a new configuration from defaults, which are overrided by arguments provided.
pub fn parse_args(args: &ArgMatches, log: &slog::Logger) -> Result<Self, Error> {
let mut config = Config::default();
// Use the specified datadir, or default in the home directory
if let Some(datadir) = args.value_of("datadir") {
config.data_dir = PathBuf::from(datadir);
fs::create_dir_all(&config.data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &config.data_dir));
info!(log, "Using custom data dir: {:?}", &config.data_dir);
};
if let Some(srv) = args.value_of("server") {
//TODO: I don't think this parses correctly a server & port combo
config.server = srv.to_string();
info!(log, "Using custom server: {:?}", &config.server);
};
// TODO: Permit loading a custom spec from file.
if let Some(spec_str) = args.value_of("spec") {
info!(log, "Using custom spec: {:?}", spec_str);
config.spec = match spec_str {
"foundation" => ChainSpec::foundation(),
"few_validators" => ChainSpec::few_validators(),
// Should be impossible due to clap's `possible_values(..)` function.
_ => unreachable!(),
};
};
Ok(config)
}
/// Try to load keys from validator_dir, returning None if none are found or an error.
pub fn fetch_keys(&self, log: &slog::Logger) -> Option<Vec<Keypair>> {
let key_pairs: Vec<Keypair> = fs::read_dir(&self.data_dir)
.unwrap()
.filter_map(|validator_dir| {
let validator_dir = validator_dir.ok()?;
if !(validator_dir.file_type().ok()?.is_dir()) {
// Skip non-directories (i.e. no files/symlinks)
return None;
}
let key_filename = validator_dir.path().join(DEFAULT_PRIVATE_KEY_FILENAME);
if !(key_filename.is_file()) {
info!(
log,
"Private key is not a file: {:?}",
key_filename.to_str()
);
return None;
}
debug!(
log,
"Deserializing private key from file: {:?}",
key_filename.to_str()
);
let mut key_file = File::open(key_filename.clone()).ok()?;
let key: Keypair = if let Ok(key_ok) = bincode::deserialize_from(&mut key_file) {
key_ok
} else {
error!(
log,
"Unable to deserialize the private key file: {:?}", key_filename
);
return None;
};
let ki = key.identifier();
if ki != validator_dir.file_name().into_string().ok()? {
error!(
log,
"The validator key ({:?}) did not match the directory filename {:?}.",
ki,
&validator_dir.path().to_string_lossy()
);
return None;
}
Some(key)
})
.collect();
// Check if it's an empty vector, and return none.
if key_pairs.is_empty() {
None
} else {
Some(key_pairs)
}
}
/// Saves a keypair to a file inside the appropriate validator directory. Returns the saved path filename.
pub fn save_key(&self, key: &Keypair) -> Result<PathBuf, Error> {
let validator_config_path = self.data_dir.join(key.identifier());
let key_path = validator_config_path.join(DEFAULT_PRIVATE_KEY_FILENAME);
fs::create_dir_all(&validator_config_path)?;
let mut key_file = File::create(&key_path)?;
bincode::serialize_into(&mut key_file, &key)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
Ok(key_path)
}
}

View File

@ -0,0 +1,3 @@
pub mod config;
pub use crate::config::Config;

View File

@ -1,17 +1,14 @@
use self::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService};
use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap};
use crate::config::ClientConfig;
use crate::config::Config;
use block_proposer::{test_utils::LocalSigner, BlockProducer};
use bls::Keypair;
use clap::{App, Arg};
use grpcio::{ChannelBuilder, EnvBuilder};
use protos::services_grpc::{BeaconBlockServiceClient, ValidatorServiceClient};
use slog::{error, info, o, Drain};
use slog::{info, o, Drain};
use slot_clock::SystemTimeSlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use types::ChainSpec;
mod block_producer_service;
mod config;
@ -55,36 +52,11 @@ fn main() {
)
.get_matches();
let mut config = ClientConfig::default();
// Custom datadir
if let Some(dir) = matches.value_of("datadir") {
config.data_dir = PathBuf::from(dir.to_string());
}
// Custom server port
if let Some(server_str) = matches.value_of("server") {
if let Ok(addr) = server_str.parse::<u16>() {
config.server = addr.to_string();
} else {
error!(log, "Invalid address"; "server" => server_str);
return;
}
}
// TODO: Permit loading a custom spec from file.
// Custom spec
if let Some(spec_str) = matches.value_of("spec") {
match spec_str {
"foundation" => config.spec = ChainSpec::foundation(),
"few_validators" => config.spec = ChainSpec::few_validators(),
// Should be impossible due to clap's `possible_values(..)` function.
_ => unreachable!(),
};
}
let config = Config::parse_args(&matches, &log)
.expect("Unable to build a configuration for the validator client.");
// Log configuration
info!(log, "";
info!(log, "Configuration parameters:";
"data_dir" => &config.data_dir.to_str(),
"server" => &config.server);
@ -119,13 +91,13 @@ fn main() {
let poll_interval_millis = spec.seconds_per_slot * 1000 / 10; // 10% epoch time precision.
info!(log, "Starting block producer service"; "polls_per_epoch" => spec.seconds_per_slot * 1000 / poll_interval_millis);
let keypairs = config.fetch_keys(&log)
.expect("No key pairs found in configuration, they must first be generated with: account_manager generate.");
/*
* Start threads.
*/
let mut threads = vec![];
// TODO: keypairs are randomly generated; they should be loaded from a file or generated.
// https://github.com/sigp/lighthouse/issues/160
let keypairs = vec![Keypair::random()];
for keypair in keypairs {
info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());