From 27bfec6692d0b29d2e01e1ec42a3ce27b8ee1234 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 16 Jan 2019 20:38:53 +1100 Subject: [PATCH] Add duties service to validator --- protos/src/services.rs | 2 +- validator_client/Cargo.toml | 3 + validator_client/src/block_producer/mod.rs | 2 +- validator_client/src/config.rs | 25 ++++ validator_client/src/duties/mod.rs | 149 +++++++++++++++++++++ validator_client/src/duties/service.rs | 33 +++++ validator_client/src/duties/test_node.rs | 28 ++++ validator_client/src/duties/traits.rs | 16 +++ validator_client/src/main.rs | 92 ++++++++----- 9 files changed, 316 insertions(+), 34 deletions(-) create mode 100644 validator_client/src/config.rs create mode 100644 validator_client/src/duties/mod.rs create mode 100644 validator_client/src/duties/service.rs create mode 100644 validator_client/src/duties/test_node.rs create mode 100644 validator_client/src/duties/traits.rs diff --git a/protos/src/services.rs b/protos/src/services.rs index 226e0ae9a..0a79a96a9 100644 --- a/protos/src/services.rs +++ b/protos/src/services.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.0.5. Do not edit +// This file is generated by rust-protobuf 2.0.6. Do not edit // @generated // https://github.com/Manishearth/rust-clippy/issues/702 diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 77f57d601..05a9a640f 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -5,6 +5,9 @@ authors = ["Paul Hauner "] edition = "2018" [dependencies] +bls = { path = "../beacon_chain/utils/bls" } +clap = "2.32.0" +dirs = "1.0.3" grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } protobuf = "2.0.2" protos = { path = "../protos" } diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs index 5987f3a24..af12010da 100644 --- a/validator_client/src/block_producer/mod.rs +++ b/validator_client/src/block_producer/mod.rs @@ -4,7 +4,7 @@ mod test_node; mod traits; use self::traits::{BeaconNode, BeaconNodeError}; -use crate::EpochDuties; +use crate::duties::EpochDuties; use slot_clock::SlotClock; use spec::ChainSpec; use std::collections::HashMap; diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs new file mode 100644 index 000000000..4b646074d --- /dev/null +++ b/validator_client/src/config.rs @@ -0,0 +1,25 @@ +use std::fs; +use std::path::PathBuf; + +/// Stores the core configuration for this validator instance. +#[derive(Clone)] +pub struct ClientConfig { + pub data_dir: PathBuf, + pub server: String, +} + +const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse-validators"; + +impl ClientConfig { + /// Build a new lighthouse configuration from defaults. + pub fn default() -> Self { + let data_dir = { + let home = dirs::home_dir().expect("Unable to determine home dir."); + home.join(DEFAULT_LIGHTHOUSE_DIR) + }; + fs::create_dir_all(&data_dir) + .unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir)); + let server = "localhost:50051".to_string(); + Self { data_dir, server } + } +} diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs new file mode 100644 index 000000000..f3c25f221 --- /dev/null +++ b/validator_client/src/duties/mod.rs @@ -0,0 +1,149 @@ +use self::traits::{BeaconNode, BeaconNodeError}; +use bls::PublicKey; +use slot_clock::SlotClock; +use spec::ChainSpec; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +mod service; +mod test_node; +mod traits; + +#[derive(Debug, PartialEq, Clone, Copy, Default)] +pub struct EpochDuties { + pub block_production_slot: Option, + pub shard: Option, +} + +impl EpochDuties { + pub fn is_block_production_slot(&self, slot: u64) -> bool { + match self.block_production_slot { + Some(s) if s == slot => true, + _ => false, + } + } +} + +type EpochDutiesMap = HashMap<(PublicKey, u64), EpochDuties>; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum PollOutcome { + NoChange, + NewDuties, + DutiesChanged, + UnknownValidatorOrEpoch, +} + +#[derive(Debug, PartialEq)] +pub enum Error { + SlotClockError, + SlotUnknowable, + EpochMapPoisoned, + SlotClockPoisoned, + EpochLengthIsZero, + BeaconNodeError(BeaconNodeError), +} + +pub struct DutiesManager { + pub duties_map: Arc>, + pub pubkey: PublicKey, + pub spec: Arc, + pub slot_clock: Arc>, + pub beacon_node: Arc, +} + +impl DutiesManager { + pub fn poll(&self) -> Result { + let slot = self + .slot_clock + .read() + .map_err(|_| Error::SlotClockPoisoned)? + .present_slot() + .map_err(|_| Error::SlotClockError)? + .ok_or(Error::SlotUnknowable)?; + + let epoch = slot + .checked_div(self.spec.epoch_length) + .ok_or(Error::EpochLengthIsZero)?; + + if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? { + let mut map = self + .duties_map + .write() + .map_err(|_| Error::EpochMapPoisoned)?; + + // If these duties were known, check to see if they're updates or identical. + let result = if let Some(known_duties) = map.get(&(self.pubkey.clone(), epoch)) { + if *known_duties == duties { + Ok(PollOutcome::NoChange) + } else { + Ok(PollOutcome::DutiesChanged) + } + } else { + Ok(PollOutcome::NewDuties) + }; + map.insert((self.pubkey.clone(), epoch), duties); + result + } else { + Ok(PollOutcome::UnknownValidatorOrEpoch) + } + } + +} + +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) + } +} + +#[cfg(test)] +mod tests { + use super::test_node::TestBeaconNode; + use super::*; + use slot_clock::TestingSlotClock; + use bls::Keypair; + + // TODO: implement more thorough testing. + // + // These tests should serve as a good example for future tests. + + #[test] + pub fn polling() { + let spec = Arc::new(ChainSpec::foundation()); + let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + let keypair = Keypair::random(); + let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); + let beacon_node = Arc::new(TestBeaconNode::default()); + + let manager = DutiesManager { + spec: spec.clone(), + pubkey: keypair.pk.clone(), + duties_map: duties_map.clone(), + slot_clock: slot_clock.clone(), + beacon_node: beacon_node.clone(), + }; + + // Configure response from the BeaconNode. + beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties{ + block_production_slot: Some(10), + shard: Some(12), + }))); + + // Get the duties for the first time... + assert_eq!(manager.poll(), Ok(PollOutcome::NewDuties)); + // Get the same duties again... + assert_eq!(manager.poll(), Ok(PollOutcome::NoChange)); + + // Return new duties. + beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties{ + block_production_slot: Some(11), + shard: Some(12), + }))); + assert_eq!(manager.poll(), Ok(PollOutcome::DutiesChanged)); + + // Return no duties. + beacon_node.set_next_shuffling_result(Ok(None)); + assert_eq!(manager.poll(), Ok(PollOutcome::UnknownValidatorOrEpoch)); + } +} diff --git a/validator_client/src/duties/service.rs b/validator_client/src/duties/service.rs new file mode 100644 index 000000000..10de53634 --- /dev/null +++ b/validator_client/src/duties/service.rs @@ -0,0 +1,33 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use super::{DutiesManager, PollOutcome}; +use slog::{debug, error, info, warn, Logger}; +use slot_clock::SlotClock; +use std::time::Duration; + +pub struct DutiesService { + pub manager: DutiesManager, + pub poll_interval_millis: u64, + pub log: Logger, +} + +impl DutiesService { + pub fn run(&mut self) { + loop { + match self.manager.poll() { + Err(error) => { + error!(self.log, "Epoch duties poll error"; "error" => format!("{:?}", error)) + } + Ok(PollOutcome::NoChange) => debug!(self.log, "No change in duties"), + Ok(PollOutcome::DutiesChanged) => { + info!(self.log, "Duties changed (potential re-org)") + } + Ok(PollOutcome::NewDuties) => info!(self.log, "New duties obtained"), + Ok(PollOutcome::UnknownValidatorOrEpoch) => { + error!(self.log, "Epoch or validator unknown") + } + }; + + std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); + } + } +} diff --git a/validator_client/src/duties/test_node.rs b/validator_client/src/duties/test_node.rs new file mode 100644 index 000000000..4ec002224 --- /dev/null +++ b/validator_client/src/duties/test_node.rs @@ -0,0 +1,28 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use super::EpochDuties; +use bls::PublicKey; +use std::sync::RwLock; + +type ShufflingResult = Result, BeaconNodeError>; + +#[derive(Default)] +pub struct TestBeaconNode { + pub request_shuffling_input: RwLock>, + pub request_shuffling_result: RwLock>, +} + +impl TestBeaconNode { + pub fn set_next_shuffling_result(&self, result: ShufflingResult) { + *self.request_shuffling_result.write().unwrap() = Some(result); + } +} + +impl BeaconNode for TestBeaconNode { + fn request_shuffling(&self, epoch: u64, public_key: &PublicKey) -> ShufflingResult { + *self.request_shuffling_input.write().unwrap() = Some((epoch, public_key.clone())); + match *self.request_shuffling_result.read().unwrap() { + Some(ref r) => r.clone(), + None => panic!("TestBeaconNode: produce_result == None"), + } + } +} diff --git a/validator_client/src/duties/traits.rs b/validator_client/src/duties/traits.rs new file mode 100644 index 000000000..eb0f1583e --- /dev/null +++ b/validator_client/src/duties/traits.rs @@ -0,0 +1,16 @@ +use super::EpochDuties; +use bls::PublicKey; + +#[derive(Debug, PartialEq, Clone)] +pub enum BeaconNodeError { + RemoteFailure(String), + DecodeFailure, +} + +pub trait BeaconNode: Send + Sync { + fn request_shuffling( + &self, + epoch: u64, + public_key: &PublicKey, + ) -> Result, BeaconNodeError>; +} diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 53c289d14..a507b8b4b 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,34 +1,80 @@ use crate::block_producer::{BlockProducer, BlockProducerService}; +use crate::config::ClientConfig; +use clap::{App, Arg}; use grpcio::{ChannelBuilder, EnvBuilder}; use protos::services_grpc::BeaconBlockServiceClient; -use slog::{info, o, Drain}; +use slog::{error, info, o, Drain}; use slot_clock::SystemTimeSlotClock; use spec::ChainSpec; use std::collections::HashMap; +use std::path::PathBuf; use std::sync::{Arc, RwLock}; mod block_producer; +mod config; +mod duties; fn main() { - // gRPC - let env = Arc::new(EnvBuilder::new().build()); - let ch = ChannelBuilder::new(env).connect("localhost:50051"); - let client = Arc::new(BeaconBlockServiceClient::new(ch)); - // Logging let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::CompactFormat::new(decorator).build().fuse(); let drain = slog_async::Async::new(drain).build().fuse(); let log = slog::Logger::root(drain, o!()); + // CLI + let matches = App::new("Lighthouse Validator Client") + .version("0.0.1") + .author("Sigma Prime ") + .about("Eth 2.0 Validator Client") + .arg( + Arg::with_name("datadir") + .long("datadir") + .value_name("DIR") + .help("Data directory for keys and databases.") + .takes_value(true), + ) + .arg( + Arg::with_name("server") + .long("server") + .value_name("server") + .help("Address to connect to BeaconNode.") + .takes_value(true), + ) + .get_matches(); + + let mut config = ClientConfig::default(); + + // Custom datadir + if let Some(dir) = matches.value_of("datadir") { + config.data_dir = PathBuf::from(dir.to_string()); + } + + // Custom server port + if let Some(server_str) = matches.value_of("server") { + if let Ok(addr) = server_str.parse::() { + config.server = addr.to_string(); + } else { + error!(log, "Invalid address"; "server" => server_str); + return; + } + } + + // Log configuration + info!(log, ""; + "data_dir" => &config.data_dir.to_str(), + "server" => &config.server); + + // gRPC + let env = Arc::new(EnvBuilder::new().build()); + let ch = ChannelBuilder::new(env).connect(&config.server); + let client = Arc::new(BeaconBlockServiceClient::new(ch)); + // Ethereum + // + // TODO: Permit loading a custom spec from file. let spec = Arc::new(ChainSpec::foundation()); - let duration = spec - .slot_duration - .checked_mul(1_000) - .expect("Slot duration overflow when converting from seconds to millis."); - + // Global map of epoch -> validator duties. let epoch_map = Arc::new(RwLock::new(HashMap::new())); let slot_clock = { info!(log, "Genesis time"; "unix_epoch_seconds" => spec.genesis_time); @@ -40,32 +86,14 @@ fn main() { let block_producer = BlockProducer::new(spec.clone(), epoch_map.clone(), slot_clock.clone(), client); - info!(log, "Slot duration"; "milliseconds" => duration); + let poll_interval_millis = spec.slot_duration * 1000 / 10; // 10% epoch time precision. + info!(log, "Starting block producer service"; "polls_per_epoch" => spec.slot_duration * 1000 / poll_interval_millis); let mut block_producer_service = BlockProducerService { block_producer, - poll_interval_millis: spec.epoch_length * 1000 / 100, // 1% epoch time precision. + poll_interval_millis, log: log.clone(), }; block_producer_service.run(); } - -#[derive(Debug, PartialEq, Clone, Copy, Default)] -pub struct EpochDuties { - block_production_slot: Option, - shard: Option, -} - -impl EpochDuties { - pub fn is_block_production_slot(&self, slot: u64) -> bool { - match self.block_production_slot { - Some(s) if s == slot => true, - _ => false, - } - } - - pub fn has_shard(&self) -> bool { - self.shard.is_some() - } -}