diff --git a/protos/src/services.proto b/protos/src/services.proto index dea9b7a37..7a4bbc977 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -97,7 +97,7 @@ message BeaconBlock { // the public keys of the validators message Validators { - repeated bytes public_key = 1; + repeated bytes public_keys = 1; } // Propose slot @@ -107,11 +107,11 @@ message GetDutiesRequest { } message GetDutiesResponse { - repeated ActiveValidator active_validator = 1; + repeated ActiveValidator active_validators = 1; } message ActiveValidator { - oneof slot_oneof { + oneof duty_oneof { bool none = 1; ValidatorDuty duty = 2; } @@ -120,7 +120,7 @@ message ActiveValidator { message ValidatorDuty { oneof block_oneof { bool none = 1; - uint64 block_produce_slot = 2; + uint64 block_production_slot = 2; } uint64 committee_slot = 3; uint64 committee_shard = 4; diff --git a/validator_client/src/duties/epoch_duties.rs b/validator_client/src/duties/epoch_duties.rs index 41d3a24c2..de787c4b8 100644 --- a/validator_client/src/duties/epoch_duties.rs +++ b/validator_client/src/duties/epoch_duties.rs @@ -1,7 +1,13 @@ -use block_proposer::{DutiesReader, DutiesReaderError}; use std::collections::HashMap; -use std::sync::RwLock; -use types::{Epoch, Fork, PublicKey, Slot}; +use std::ops::{Deref, DerefMut}; +use types::{Epoch, PublicKey, Slot}; + +/// The type of work a validator is required to do in a given slot. +#[derive(Debug, Clone)] +pub struct WorkType { + produce_block: bool, + produce_attestation: bool, +} /// The information required for a validator to propose and attest during some epoch. /// @@ -17,84 +23,91 @@ pub struct EpochDuty { } impl EpochDuty { - /// Returns `true` if work needs to be done in the supplied `slot` - pub fn is_work_slot(&self, slot: Slot) -> bool { + /// Returns `WorkType` if work needs to be done in the supplied `slot` + pub fn is_work_slot(&self, slot: Slot) -> Option { // if validator is required to produce a slot return true - match self.block_production_slot { - Some(s) if s == slot => return true, + let produce_block = match self.block_production_slot { + Some(s) if s == slot => true, _ => false, + }; + + let mut produce_attestation = false; + if self.committee_slot == slot { + produce_attestation = true; } - if self.committee_slot == slot { - return true; + if produce_block | produce_attestation { + return Some(WorkType { + produce_block, + produce_attestation, + }); } - return false; + None } } /// Maps a list of public keys (many validators) to an EpochDuty. -pub struct EpochDuties { - inner: HashMap>, -} +pub type EpochDuties = HashMap>; pub enum EpochDutiesMapError { Poisoned, + UnknownEpoch, + UnknownValidator, } /// Maps an `epoch` to some `EpochDuties` for a single validator. pub struct EpochDutiesMap { pub slots_per_epoch: u64, - pub map: RwLock>, + pub map: HashMap, } impl EpochDutiesMap { pub fn new(slots_per_epoch: u64) -> Self { Self { slots_per_epoch, - map: RwLock::new(HashMap::new()), + map: HashMap::new(), } } - - pub fn get(&self, epoch: Epoch) -> Result, EpochDutiesMapError> { - let map = self.map.read().map_err(|_| EpochDutiesMapError::Poisoned)?; - match map.get(&epoch) { - Some(duties) => Ok(Some(*duties)), - None => Ok(None), - } - } - - pub fn insert( - &self, - epoch: Epoch, - epoch_duties: EpochDuties, - ) -> Result, EpochDutiesMapError> { - let mut map = self - .map - .write() - .map_err(|_| EpochDutiesMapError::Poisoned)?; - Ok(map.insert(epoch, epoch_duties)) - } } -impl DutiesReader for EpochDutiesMap { - fn is_block_production_slot(&self, slot: Slot) -> Result { +// Expose the hashmap methods +impl Deref for EpochDutiesMap { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.map + } +} +impl DerefMut for EpochDutiesMap { + fn deref_mut(&mut self) -> &mut HashMap { + &mut self.map + } +} + +impl EpochDutiesMap { + /// Checks if the validator has work to do. + fn is_work_slot( + &self, + slot: Slot, + pubkey: &PublicKey, + ) -> Result, EpochDutiesMapError> { let epoch = slot.epoch(self.slots_per_epoch); - let map = self.map.read().map_err(|_| DutiesReaderError::Poisoned)?; - let duties = map + let epoch_duties = self + .map .get(&epoch) - .ok_or_else(|| DutiesReaderError::UnknownEpoch)?; - Ok(duties.is_block_production_slot(slot)) - } - - fn fork(&self) -> Result { - // TODO: this is garbage data. - // - // It will almost certainly cause signatures to fail verification. - Ok(Fork { - previous_version: [0; 4], - current_version: [0; 4], - epoch: Epoch::new(0), - }) + .ok_or_else(|| EpochDutiesMapError::UnknownEpoch)?; + if let Some(epoch_duty) = epoch_duties.get(pubkey) { + if let Some(duty) = epoch_duty { + // Retrieves the duty for a validator at a given slot + return Ok(duty.is_work_slot(slot)); + } else { + // the validator isn't active + return Ok(None); + } + } else { + // validator isn't known + return Err(EpochDutiesMapError::UnknownValidator); + } } } diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs index 91da512e3..32ac86435 100644 --- a/validator_client/src/duties/grpc.rs +++ b/validator_client/src/duties/grpc.rs @@ -1,8 +1,6 @@ use super::epoch_duties::{EpochDuties, EpochDuty}; use super::traits::{BeaconNode, BeaconNodeError}; -use protos::services::{ - ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty, Validators, -}; +use protos::services::{GetDutiesRequest, Validators}; use protos::services_grpc::ValidatorServiceClient; use ssz::ssz_encode; use std::collections::HashMap; @@ -19,10 +17,8 @@ impl BeaconNode for ValidatorServiceClient { // build the request let mut req = GetDutiesRequest::new(); req.set_epoch(epoch.as_u64()); - let validators = Validators::new().mut_public_key(); - for pubkey in pubkeys { - validators.push(pubkey); - } + let mut validators = Validators::new(); + validators.set_public_keys(pubkeys.iter().map(|v| ssz_encode(v)).collect()); req.set_validators(validators); // send the request, get the duties reply @@ -30,24 +26,29 @@ impl BeaconNode for ValidatorServiceClient { .get_validator_duties(&req) .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; - let mut epoch_duties: HashMap> = HashMap::new(); - for (index, validator_duty) in reply.get_active_validator().enumerate() { - if let Some(duty) = validator_duty.has_slot() { - // the validator is active - //build the EpochDuty - let active_duty = duty.get_duty(); - let block_produce_slot = active_duty.get_block_produce_slot(); - let epoch_duty = EpochDuty { - block_produce_slot, - committee_slot: active_duty.get_committee_slot(), - committee_shard: active_duty.get_committee_shard(), - committee_index: active_duty.get_committee_index(), - }; - epoch_duties.insert(pubkeys[index], Some(epoch_duty)); - } else { - // validator is not active and has no duties - epoch_duties.insert(pubkeys[index], None); + let mut epoch_duties: HashMap> = HashMap::new(); + for (index, validator_duty) in reply.get_active_validators().iter().enumerate() { + if !validator_duty.has_duty() { + // validator is inactive + epoch_duties.insert(pubkeys[index].clone(), None); + break; } + // active validator + let active_duty = validator_duty.get_duty(); + let block_production_slot = { + if active_duty.has_block_production_slot() { + Some(Slot::from(active_duty.get_block_production_slot())) + } else { + None + } + }; + let epoch_duty = EpochDuty { + block_production_slot, + committee_slot: Slot::from(active_duty.get_committee_slot()), + committee_shard: active_duty.get_committee_shard(), + committee_index: active_duty.get_committee_index(), + }; + epoch_duties.insert(pubkeys[index].clone(), Some(epoch_duty)); } Ok(epoch_duties) } diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index 77d6d43b6..20a477910 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -1,7 +1,8 @@ mod epoch_duties; mod grpc; -#[cfg(test)] -mod test_node; +// TODO: reintroduce tests +//#[cfg(test)] +//mod test_node; mod traits; pub use self::epoch_duties::EpochDutiesMap; @@ -11,9 +12,10 @@ use bls::PublicKey; use futures::Async; use slog::{debug, error, info}; use std::sync::Arc; -use types::{Epoch, Slot}; +use std::sync::RwLock; +use types::Epoch; -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Clone)] pub enum UpdateOutcome { /// The `EpochDuties` were not updated during this poll. NoChange(Epoch), @@ -29,8 +31,11 @@ pub enum UpdateOutcome { #[derive(Debug, PartialEq)] pub enum Error { + DutiesMapPoisoned, EpochMapPoisoned, BeaconNodeError(BeaconNodeError), + UnknownEpoch, + UnknownValidator, } /// A polling state machine which ensures the latest `EpochDuties` are obtained from the Beacon @@ -38,43 +43,37 @@ pub enum Error { /// /// This keeps track of all validator keys and required voting slots. pub struct DutiesManager { - pub duties_map: Arc, + pub duties_map: RwLock, /// A list of all public keys known to the validator service. pub pubkeys: Vec, - pub slots_per_epoch: u64, pub beacon_node: Arc, } impl DutiesManager { /// Check the Beacon Node for `EpochDuties`. /// - /// The present `epoch` will be learned from the supplied `SlotClock`. In production this will /// be a wall-clock (e.g., system time, remote server time, etc.). - fn update(&self, slot: Slot) -> Result { - let epoch = slot.epoch(self.slots_per_epoch); - - if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkeys)? { - // If these duties were known, check to see if they're updates or identical. - let result = if let Some(known_duties) = self.duties_map.get(epoch)? { - if known_duties == duties { - Ok(UpdateOutcome::NoChange(epoch)) - } else { - Ok(UpdateOutcome::DutiesChanged(epoch, duties)) - } + fn update(&self, epoch: Epoch) -> Result { + let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?; + // If these duties were known, check to see if they're updates or identical. + let result = if let Some(known_duties) = self.duties_map.read()?.get(&epoch) { + if *known_duties == duties { + return Ok(UpdateOutcome::NoChange(epoch)); } else { - Ok(UpdateOutcome::NewDuties(epoch, duties)) - }; - self.duties_map.insert(epoch, duties)?; - result + //TODO: Duties could be large here. Remove from display and avoid the clone. + return Ok(UpdateOutcome::DutiesChanged(epoch, duties.clone())); + } } else { - Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) - } + Ok(UpdateOutcome::NewDuties(epoch, duties.clone())) + }; + self.duties_map.write()?.insert(epoch, duties); + result } /// A future wrapping around `update()`. This will perform logic based upon the update /// process and complete once the update has completed. - pub fn run_update(&self, slot: Slot, log: slog::Logger) -> Result, ()> { - match self.update(slot) { + pub fn run_update(&self, epoch: Epoch, log: slog::Logger) -> Result, ()> { + match self.update(epoch) { Err(error) => error!(log, "Epoch duties poll error"; "error" => format!("{:?}", error)), Ok(UpdateOutcome::NoChange(epoch)) => { debug!(log, "No change in duties"; "epoch" => epoch) @@ -93,16 +92,25 @@ impl DutiesManager { } } +//TODO: Use error_chain to handle errors impl From for Error { fn from(e: BeaconNodeError) -> Error { Error::BeaconNodeError(e) } } +//TODO: Use error_chain to handle errors +impl From> for Error { + fn from(e: std::sync::PoisonError) -> Error { + Error::DutiesMapPoisoned + } +} impl From for Error { fn from(e: EpochDutiesMapError) -> Error { match e { EpochDutiesMapError::Poisoned => Error::EpochMapPoisoned, + EpochDutiesMapError::UnknownEpoch => Error::UnknownEpoch, + EpochDutiesMapError::UnknownValidator => Error::UnknownValidator, } } } diff --git a/validator_client/src/duties/traits.rs b/validator_client/src/duties/traits.rs index 3aa8fbab2..374bed9f6 100644 --- a/validator_client/src/duties/traits.rs +++ b/validator_client/src/duties/traits.rs @@ -16,5 +16,5 @@ pub trait BeaconNode: Send + Sync { &self, epoch: Epoch, pubkeys: &[PublicKey], - ) -> Result>, BeaconNodeError>; + ) -> Result; } diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index e32271299..c8e03e73d 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -37,12 +37,14 @@ pub struct Service { chain_id: u16, /// The fork state we processing on. fork: Fork, - /// The slot clock keeping track of time. - slot_clock: Arc, + /// The slot clock for this service. + slot_clock: SystemTimeSlotClock, /// The current slot we are processing. current_slot: Slot, /// Duration until the next slot. This is used for initializing the tokio timer interval. duration_to_next_slot: Duration, + /// The number of slots per epoch to allow for converting slots to epochs. + slots_per_epoch: u64, // GRPC Clients /// The beacon block GRPC client. beacon_block_client: Arc, @@ -74,7 +76,7 @@ impl Service { // retrieve node information let node_info = loop { - let info = match beacon_node_client.info(&Empty::new()) { + match beacon_node_client.info(&Empty::new()) { Err(e) => { warn!(log, "Could not connect to node. Error: {}", e); info!(log, "Retrying in 5 seconds..."); @@ -115,13 +117,6 @@ impl Service { epoch: Epoch::from(proto_fork.get_epoch()), }; - // build the validator slot clock - let slot_clock = { - let clock = SystemTimeSlotClock::new(genesis_time, config.spec.seconds_per_slot) - .expect("Unable to instantiate SystemTimeSlotClock."); - Arc::new(clock) - }; - // initialize the RPC clients // Beacon node gRPC beacon block endpoints. @@ -142,6 +137,10 @@ impl Service { Arc::new(AttestationServiceClient::new(ch)) }; + // build the validator slot clock + let slot_clock = SystemTimeSlotClock::new(genesis_time, config.spec.seconds_per_slot) + .expect("Unable to instantiate SystemTimeSlotClock."); + let current_slot = slot_clock .present_slot() .map_err(|e| ErrorKind::SlotClockError(e))? @@ -179,6 +178,7 @@ impl Service { slot_clock, current_slot, duration_to_next_slot, + slots_per_epoch: config.spec.slots_per_epoch, beacon_block_client, validator_client, attester_client, @@ -211,7 +211,7 @@ impl Service { ) }; - // kick off core service + /* kick off core service */ // generate keypairs @@ -219,14 +219,18 @@ impl Service { // https://github.com/sigp/lighthouse/issues/160 let keypairs = Arc::new(vec![Keypair::random()]); - // build requisite objects to pass to core thread. - let duties_map = Arc::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); - let epoch_map_for_attester = Arc::new(EpochMap::new(config.spec.slots_per_epoch)); + /* build requisite objects to pass to core thread */ + // Builds a mapping of Epoch -> Map(PublicKey, EpochDuty) + // where EpochDuty contains slot numbers and attestation data that each validator needs to + // produce work on. + let duties_map = EpochDutiesMap::new(config.spec.slots_per_epoch); + + // builds a manager which maintains the list of current duties for all known validators + // and can check when a validator needs to perform a task. let manager = Arc::new(DutiesManager { duties_map, pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(), - slots_per_epoch: config.spec.slots_per_epoch.clone(), beacon_node: service.validator_client.clone(), }); @@ -244,6 +248,8 @@ impl Service { Ok(slot) => slot.expect("Genesis is in the future"), }; + let current_epoch = current_slot.epoch(service.slots_per_epoch); + debug_assert!( current_slot > service.current_slot, "The Timer should poll a new slot" @@ -252,9 +258,9 @@ impl Service { info!(log, "Processing slot: {}", current_slot.as_u64()); // check for new duties - let cloned_manager = manager.clone(); + let mut cloned_manager = manager.clone(); tokio::spawn(futures::future::poll_fn(move || { - cloned_manager.run_update(current_slot.clone(), log.clone()) + cloned_manager.run_update(current_epoch.clone(), log.clone()) })); // execute any specified duties