Epoch duties update

This commit is contained in:
Age Manning 2019-03-27 19:47:08 +11:00
parent bc8ec51fe5
commit 46181408ba
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
6 changed files with 152 additions and 124 deletions

View File

@ -97,7 +97,7 @@ message BeaconBlock {
// the public keys of the validators
message Validators {
repeated bytes public_key = 1;
repeated bytes public_keys = 1;
}
// Propose slot
@ -107,11 +107,11 @@ message GetDutiesRequest {
}
message GetDutiesResponse {
repeated ActiveValidator active_validator = 1;
repeated ActiveValidator active_validators = 1;
}
message ActiveValidator {
oneof slot_oneof {
oneof duty_oneof {
bool none = 1;
ValidatorDuty duty = 2;
}
@ -120,7 +120,7 @@ message ActiveValidator {
message ValidatorDuty {
oneof block_oneof {
bool none = 1;
uint64 block_produce_slot = 2;
uint64 block_production_slot = 2;
}
uint64 committee_slot = 3;
uint64 committee_shard = 4;

View File

@ -1,7 +1,13 @@
use block_proposer::{DutiesReader, DutiesReaderError};
use std::collections::HashMap;
use std::sync::RwLock;
use types::{Epoch, Fork, PublicKey, Slot};
use std::ops::{Deref, DerefMut};
use types::{Epoch, PublicKey, Slot};
/// The type of work a validator is required to do in a given slot.
#[derive(Debug, Clone)]
pub struct WorkType {
produce_block: bool,
produce_attestation: bool,
}
/// The information required for a validator to propose and attest during some epoch.
///
@ -17,84 +23,91 @@ pub struct EpochDuty {
}
impl EpochDuty {
/// Returns `true` if work needs to be done in the supplied `slot`
pub fn is_work_slot(&self, slot: Slot) -> bool {
/// Returns `WorkType` if work needs to be done in the supplied `slot`
pub fn is_work_slot(&self, slot: Slot) -> Option<WorkType> {
// if validator is required to produce a slot return true
match self.block_production_slot {
Some(s) if s == slot => return true,
let produce_block = match self.block_production_slot {
Some(s) if s == slot => true,
_ => false,
};
let mut produce_attestation = false;
if self.committee_slot == slot {
produce_attestation = true;
}
if self.committee_slot == slot {
return true;
if produce_block | produce_attestation {
return Some(WorkType {
produce_block,
produce_attestation,
});
}
return false;
None
}
}
/// Maps a list of public keys (many validators) to an EpochDuty.
pub struct EpochDuties {
inner: HashMap<PublicKey, Option<EpochDuty>>,
}
pub type EpochDuties = HashMap<PublicKey, Option<EpochDuty>>;
pub enum EpochDutiesMapError {
Poisoned,
UnknownEpoch,
UnknownValidator,
}
/// Maps an `epoch` to some `EpochDuties` for a single validator.
pub struct EpochDutiesMap {
pub slots_per_epoch: u64,
pub map: RwLock<HashMap<Epoch, EpochDuties>>,
pub map: HashMap<Epoch, EpochDuties>,
}
impl EpochDutiesMap {
pub fn new(slots_per_epoch: u64) -> Self {
Self {
slots_per_epoch,
map: RwLock::new(HashMap::new()),
map: HashMap::new(),
}
}
pub fn get(&self, epoch: Epoch) -> Result<Option<EpochDuties>, EpochDutiesMapError> {
let map = self.map.read().map_err(|_| EpochDutiesMapError::Poisoned)?;
match map.get(&epoch) {
Some(duties) => Ok(Some(*duties)),
None => Ok(None),
}
}
pub fn insert(
&self,
epoch: Epoch,
epoch_duties: EpochDuties,
) -> Result<Option<EpochDuties>, EpochDutiesMapError> {
let mut map = self
.map
.write()
.map_err(|_| EpochDutiesMapError::Poisoned)?;
Ok(map.insert(epoch, epoch_duties))
}
}
impl DutiesReader for EpochDutiesMap {
fn is_block_production_slot(&self, slot: Slot) -> Result<bool, DutiesReaderError> {
// Expose the hashmap methods
impl Deref for EpochDutiesMap {
type Target = HashMap<Epoch, EpochDuties>;
fn deref(&self) -> &Self::Target {
&self.map
}
}
impl DerefMut for EpochDutiesMap {
fn deref_mut(&mut self) -> &mut HashMap<Epoch, EpochDuties> {
&mut self.map
}
}
impl EpochDutiesMap {
/// Checks if the validator has work to do.
fn is_work_slot(
&self,
slot: Slot,
pubkey: &PublicKey,
) -> Result<Option<WorkType>, EpochDutiesMapError> {
let epoch = slot.epoch(self.slots_per_epoch);
let map = self.map.read().map_err(|_| DutiesReaderError::Poisoned)?;
let duties = map
let epoch_duties = self
.map
.get(&epoch)
.ok_or_else(|| DutiesReaderError::UnknownEpoch)?;
Ok(duties.is_block_production_slot(slot))
}
fn fork(&self) -> Result<Fork, DutiesReaderError> {
// TODO: this is garbage data.
//
// It will almost certainly cause signatures to fail verification.
Ok(Fork {
previous_version: [0; 4],
current_version: [0; 4],
epoch: Epoch::new(0),
})
.ok_or_else(|| EpochDutiesMapError::UnknownEpoch)?;
if let Some(epoch_duty) = epoch_duties.get(pubkey) {
if let Some(duty) = epoch_duty {
// Retrieves the duty for a validator at a given slot
return Ok(duty.is_work_slot(slot));
} else {
// the validator isn't active
return Ok(None);
}
} else {
// validator isn't known
return Err(EpochDutiesMapError::UnknownValidator);
}
}
}

View File

@ -1,8 +1,6 @@
use super::epoch_duties::{EpochDuties, EpochDuty};
use super::traits::{BeaconNode, BeaconNodeError};
use protos::services::{
ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty, Validators,
};
use protos::services::{GetDutiesRequest, Validators};
use protos::services_grpc::ValidatorServiceClient;
use ssz::ssz_encode;
use std::collections::HashMap;
@ -19,10 +17,8 @@ impl BeaconNode for ValidatorServiceClient {
// build the request
let mut req = GetDutiesRequest::new();
req.set_epoch(epoch.as_u64());
let validators = Validators::new().mut_public_key();
for pubkey in pubkeys {
validators.push(pubkey);
}
let mut validators = Validators::new();
validators.set_public_keys(pubkeys.iter().map(|v| ssz_encode(v)).collect());
req.set_validators(validators);
// send the request, get the duties reply
@ -30,24 +26,29 @@ impl BeaconNode for ValidatorServiceClient {
.get_validator_duties(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
let mut epoch_duties: HashMap<PublicKey, Option<EpochDuties>> = HashMap::new();
for (index, validator_duty) in reply.get_active_validator().enumerate() {
if let Some(duty) = validator_duty.has_slot() {
// the validator is active
//build the EpochDuty
let active_duty = duty.get_duty();
let block_produce_slot = active_duty.get_block_produce_slot();
let epoch_duty = EpochDuty {
block_produce_slot,
committee_slot: active_duty.get_committee_slot(),
committee_shard: active_duty.get_committee_shard(),
committee_index: active_duty.get_committee_index(),
};
epoch_duties.insert(pubkeys[index], Some(epoch_duty));
} else {
// validator is not active and has no duties
epoch_duties.insert(pubkeys[index], None);
let mut epoch_duties: HashMap<PublicKey, Option<EpochDuty>> = HashMap::new();
for (index, validator_duty) in reply.get_active_validators().iter().enumerate() {
if !validator_duty.has_duty() {
// validator is inactive
epoch_duties.insert(pubkeys[index].clone(), None);
break;
}
// active validator
let active_duty = validator_duty.get_duty();
let block_production_slot = {
if active_duty.has_block_production_slot() {
Some(Slot::from(active_duty.get_block_production_slot()))
} else {
None
}
};
let epoch_duty = EpochDuty {
block_production_slot,
committee_slot: Slot::from(active_duty.get_committee_slot()),
committee_shard: active_duty.get_committee_shard(),
committee_index: active_duty.get_committee_index(),
};
epoch_duties.insert(pubkeys[index].clone(), Some(epoch_duty));
}
Ok(epoch_duties)
}

View File

@ -1,7 +1,8 @@
mod epoch_duties;
mod grpc;
#[cfg(test)]
mod test_node;
// TODO: reintroduce tests
//#[cfg(test)]
//mod test_node;
mod traits;
pub use self::epoch_duties::EpochDutiesMap;
@ -11,9 +12,10 @@ use bls::PublicKey;
use futures::Async;
use slog::{debug, error, info};
use std::sync::Arc;
use types::{Epoch, Slot};
use std::sync::RwLock;
use types::Epoch;
#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Debug, PartialEq, Clone)]
pub enum UpdateOutcome {
/// The `EpochDuties` were not updated during this poll.
NoChange(Epoch),
@ -29,8 +31,11 @@ pub enum UpdateOutcome {
#[derive(Debug, PartialEq)]
pub enum Error {
DutiesMapPoisoned,
EpochMapPoisoned,
BeaconNodeError(BeaconNodeError),
UnknownEpoch,
UnknownValidator,
}
/// A polling state machine which ensures the latest `EpochDuties` are obtained from the Beacon
@ -38,43 +43,37 @@ pub enum Error {
///
/// This keeps track of all validator keys and required voting slots.
pub struct DutiesManager<U: BeaconNode> {
pub duties_map: Arc<EpochDutiesMap>,
pub duties_map: RwLock<EpochDutiesMap>,
/// A list of all public keys known to the validator service.
pub pubkeys: Vec<PublicKey>,
pub slots_per_epoch: u64,
pub beacon_node: Arc<U>,
}
impl<U: BeaconNode> DutiesManager<U> {
/// Check the Beacon Node for `EpochDuties`.
///
/// The present `epoch` will be learned from the supplied `SlotClock`. In production this will
/// be a wall-clock (e.g., system time, remote server time, etc.).
fn update(&self, slot: Slot) -> Result<UpdateOutcome, Error> {
let epoch = slot.epoch(self.slots_per_epoch);
if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkeys)? {
// If these duties were known, check to see if they're updates or identical.
let result = if let Some(known_duties) = self.duties_map.get(epoch)? {
if known_duties == duties {
Ok(UpdateOutcome::NoChange(epoch))
} else {
Ok(UpdateOutcome::DutiesChanged(epoch, duties))
}
fn update(&self, epoch: Epoch) -> Result<UpdateOutcome, Error> {
let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?;
// If these duties were known, check to see if they're updates or identical.
let result = if let Some(known_duties) = self.duties_map.read()?.get(&epoch) {
if *known_duties == duties {
return Ok(UpdateOutcome::NoChange(epoch));
} else {
Ok(UpdateOutcome::NewDuties(epoch, duties))
};
self.duties_map.insert(epoch, duties)?;
result
//TODO: Duties could be large here. Remove from display and avoid the clone.
return Ok(UpdateOutcome::DutiesChanged(epoch, duties.clone()));
}
} else {
Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch))
}
Ok(UpdateOutcome::NewDuties(epoch, duties.clone()))
};
self.duties_map.write()?.insert(epoch, duties);
result
}
/// A future wrapping around `update()`. This will perform logic based upon the update
/// process and complete once the update has completed.
pub fn run_update(&self, slot: Slot, log: slog::Logger) -> Result<Async<()>, ()> {
match self.update(slot) {
pub fn run_update(&self, epoch: Epoch, log: slog::Logger) -> Result<Async<()>, ()> {
match self.update(epoch) {
Err(error) => error!(log, "Epoch duties poll error"; "error" => format!("{:?}", error)),
Ok(UpdateOutcome::NoChange(epoch)) => {
debug!(log, "No change in duties"; "epoch" => epoch)
@ -93,16 +92,25 @@ impl<U: BeaconNode> DutiesManager<U> {
}
}
//TODO: Use error_chain to handle errors
impl From<BeaconNodeError> for Error {
fn from(e: BeaconNodeError) -> Error {
Error::BeaconNodeError(e)
}
}
//TODO: Use error_chain to handle errors
impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(e: std::sync::PoisonError<T>) -> Error {
Error::DutiesMapPoisoned
}
}
impl From<EpochDutiesMapError> for Error {
fn from(e: EpochDutiesMapError) -> Error {
match e {
EpochDutiesMapError::Poisoned => Error::EpochMapPoisoned,
EpochDutiesMapError::UnknownEpoch => Error::UnknownEpoch,
EpochDutiesMapError::UnknownValidator => Error::UnknownValidator,
}
}
}

View File

@ -16,5 +16,5 @@ pub trait BeaconNode: Send + Sync {
&self,
epoch: Epoch,
pubkeys: &[PublicKey],
) -> Result<Vec<Option<EpochDuties>>, BeaconNodeError>;
) -> Result<EpochDuties, BeaconNodeError>;
}

View File

@ -37,12 +37,14 @@ pub struct Service {
chain_id: u16,
/// The fork state we processing on.
fork: Fork,
/// The slot clock keeping track of time.
slot_clock: Arc<SystemTimeSlotClock>,
/// The slot clock for this service.
slot_clock: SystemTimeSlotClock,
/// The current slot we are processing.
current_slot: Slot,
/// Duration until the next slot. This is used for initializing the tokio timer interval.
duration_to_next_slot: Duration,
/// The number of slots per epoch to allow for converting slots to epochs.
slots_per_epoch: u64,
// GRPC Clients
/// The beacon block GRPC client.
beacon_block_client: Arc<BeaconBlockServiceClient>,
@ -74,7 +76,7 @@ impl Service {
// retrieve node information
let node_info = loop {
let info = match beacon_node_client.info(&Empty::new()) {
match beacon_node_client.info(&Empty::new()) {
Err(e) => {
warn!(log, "Could not connect to node. Error: {}", e);
info!(log, "Retrying in 5 seconds...");
@ -115,13 +117,6 @@ impl Service {
epoch: Epoch::from(proto_fork.get_epoch()),
};
// build the validator slot clock
let slot_clock = {
let clock = SystemTimeSlotClock::new(genesis_time, config.spec.seconds_per_slot)
.expect("Unable to instantiate SystemTimeSlotClock.");
Arc::new(clock)
};
// initialize the RPC clients
// Beacon node gRPC beacon block endpoints.
@ -142,6 +137,10 @@ impl Service {
Arc::new(AttestationServiceClient::new(ch))
};
// build the validator slot clock
let slot_clock = SystemTimeSlotClock::new(genesis_time, config.spec.seconds_per_slot)
.expect("Unable to instantiate SystemTimeSlotClock.");
let current_slot = slot_clock
.present_slot()
.map_err(|e| ErrorKind::SlotClockError(e))?
@ -179,6 +178,7 @@ impl Service {
slot_clock,
current_slot,
duration_to_next_slot,
slots_per_epoch: config.spec.slots_per_epoch,
beacon_block_client,
validator_client,
attester_client,
@ -211,7 +211,7 @@ impl Service {
)
};
// kick off core service
/* kick off core service */
// generate keypairs
@ -219,14 +219,18 @@ impl Service {
// https://github.com/sigp/lighthouse/issues/160
let keypairs = Arc::new(vec![Keypair::random()]);
// build requisite objects to pass to core thread.
let duties_map = Arc::new(EpochDutiesMap::new(config.spec.slots_per_epoch));
let epoch_map_for_attester = Arc::new(EpochMap::new(config.spec.slots_per_epoch));
/* build requisite objects to pass to core thread */
// Builds a mapping of Epoch -> Map(PublicKey, EpochDuty)
// where EpochDuty contains slot numbers and attestation data that each validator needs to
// produce work on.
let duties_map = EpochDutiesMap::new(config.spec.slots_per_epoch);
// builds a manager which maintains the list of current duties for all known validators
// and can check when a validator needs to perform a task.
let manager = Arc::new(DutiesManager {
duties_map,
pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(),
slots_per_epoch: config.spec.slots_per_epoch.clone(),
beacon_node: service.validator_client.clone(),
});
@ -244,6 +248,8 @@ impl Service {
Ok(slot) => slot.expect("Genesis is in the future"),
};
let current_epoch = current_slot.epoch(service.slots_per_epoch);
debug_assert!(
current_slot > service.current_slot,
"The Timer should poll a new slot"
@ -252,9 +258,9 @@ impl Service {
info!(log, "Processing slot: {}", current_slot.as_u64());
// check for new duties
let cloned_manager = manager.clone();
let mut cloned_manager = manager.clone();
tokio::spawn(futures::future::poll_fn(move || {
cloned_manager.run_update(current_slot.clone(), log.clone())
cloned_manager.run_update(current_epoch.clone(), log.clone())
}));
// execute any specified duties