Implement work finding logic in validator client

This commit is contained in:
Age Manning 2019-03-27 22:22:51 +11:00
parent a315e9da49
commit 75195bbbf4
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
3 changed files with 49 additions and 22 deletions

View File

@ -5,8 +5,8 @@ use types::{Epoch, PublicKey, Slot};
/// The type of work a validator is required to do in a given slot. /// The type of work a validator is required to do in a given slot.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct WorkType { pub struct WorkType {
produce_block: bool, pub produce_block: bool,
produce_attestation: bool, pub produce_attestation: bool,
} }
/// The information required for a validator to propose and attest during some epoch. /// The information required for a validator to propose and attest during some epoch.
@ -85,7 +85,7 @@ impl DerefMut for EpochDutiesMap {
impl EpochDutiesMap { impl EpochDutiesMap {
/// Checks if the validator has work to do. /// Checks if the validator has work to do.
fn is_work_slot( pub fn is_work_slot(
&self, &self,
slot: Slot, slot: Slot,
pubkey: &PublicKey, pubkey: &PublicKey,

View File

@ -5,15 +5,14 @@ mod grpc;
//mod test_node; //mod test_node;
mod traits; mod traits;
pub use self::epoch_duties::EpochDutiesMap;
use self::epoch_duties::{EpochDuties, EpochDutiesMapError}; use self::epoch_duties::{EpochDuties, EpochDutiesMapError};
pub use self::epoch_duties::{EpochDutiesMap, WorkType};
use self::traits::{BeaconNode, BeaconNodeError}; use self::traits::{BeaconNode, BeaconNodeError};
use bls::PublicKey;
use futures::Async; use futures::Async;
use slog::{debug, error, info}; use slog::{debug, error, info};
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use types::Epoch; use types::{Epoch, PublicKey, Slot};
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub enum UpdateOutcome { pub enum UpdateOutcome {
@ -24,9 +23,6 @@ pub enum UpdateOutcome {
/// New `EpochDuties` were obtained, different to those which were previously known. This is /// New `EpochDuties` were obtained, different to those which were previously known. This is
/// likely to be the result of chain re-organisation. /// likely to be the result of chain re-organisation.
DutiesChanged(Epoch, EpochDuties), DutiesChanged(Epoch, EpochDuties),
/// The Beacon Node was unable to return the duties as the validator is unknown, or the
/// shuffling for the epoch is unknown.
UnknownValidatorOrEpoch(Epoch),
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -56,18 +52,19 @@ impl<U: BeaconNode> DutiesManager<U> {
fn update(&self, epoch: Epoch) -> Result<UpdateOutcome, Error> { fn update(&self, epoch: Epoch) -> Result<UpdateOutcome, Error> {
let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?; let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?;
// If these duties were known, check to see if they're updates or identical. // If these duties were known, check to see if they're updates or identical.
let result = if let Some(known_duties) = self.duties_map.read()?.get(&epoch) { if let Some(known_duties) = self.duties_map.read()?.get(&epoch) {
if *known_duties == duties { if *known_duties == duties {
return Ok(UpdateOutcome::NoChange(epoch)); return Ok(UpdateOutcome::NoChange(epoch));
} else { } else {
//TODO: Duties could be large here. Remove from display and avoid the clone. //TODO: Duties could be large here. Remove from display and avoid the clone.
return Ok(UpdateOutcome::DutiesChanged(epoch, duties.clone())); self.duties_map.write()?.insert(epoch, duties.clone());
return Ok(UpdateOutcome::DutiesChanged(epoch, duties));
} }
} else { } else {
Ok(UpdateOutcome::NewDuties(epoch, duties.clone())) //TODO: Remove clone by removing duties from outcome
self.duties_map.write()?.insert(epoch, duties.clone());
return Ok(UpdateOutcome::NewDuties(epoch, duties));
}; };
self.duties_map.write()?.insert(epoch, duties);
result
} }
/// A future wrapping around `update()`. This will perform logic based upon the update /// A future wrapping around `update()`. This will perform logic based upon the update
@ -84,12 +81,30 @@ impl<U: BeaconNode> DutiesManager<U> {
Ok(UpdateOutcome::NewDuties(epoch, duties)) => { Ok(UpdateOutcome::NewDuties(epoch, duties)) => {
info!(log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) info!(log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
} }
Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) => {
error!(log, "Epoch or validator unknown"; "epoch" => epoch)
}
}; };
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
/// Returns a list of (Public, WorkType) indicating all the validators that have work to perform
/// this slot.
pub fn get_current_work(&self, slot: Slot) -> Option<Vec<(PublicKey, WorkType)>> {
let mut current_work: Vec<(PublicKey, WorkType)> = Vec::new();
// if the map is poisoned, return None
let duties = self.duties_map.read().ok()?;
for validator_pk in &self.pubkeys {
match duties.is_work_slot(slot, &validator_pk) {
Ok(Some(work_type)) => current_work.push((validator_pk.clone(), work_type)),
Ok(None) => {} // No work for this validator
Err(_) => {} // Unknown epoch or validator, no work
}
}
if current_work.is_empty() {
return None;
}
Some(current_work)
}
} }
//TODO: Use error_chain to handle errors //TODO: Use error_chain to handle errors
@ -101,7 +116,7 @@ impl From<BeaconNodeError> for Error {
//TODO: Use error_chain to handle errors //TODO: Use error_chain to handle errors
impl<T> From<std::sync::PoisonError<T>> for Error { impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(e: std::sync::PoisonError<T>) -> Error { fn from(_e: std::sync::PoisonError<T>) -> Error {
Error::DutiesMapPoisoned Error::DutiesMapPoisoned
} }
} }

View File

@ -240,7 +240,7 @@ impl Service {
.block_on(interval.for_each(move |_| { .block_on(interval.for_each(move |_| {
let log = service.log.clone(); let log = service.log.clone();
// get the current slot /* get the current slot and epoch */
let current_slot = match service.slot_clock.present_slot() { let current_slot = match service.slot_clock.present_slot() {
Err(e) => { Err(e) => {
error!(log, "SystemTimeError {:?}", e); error!(log, "SystemTimeError {:?}", e);
@ -258,13 +258,25 @@ impl Service {
info!(log, "Processing slot: {}", current_slot.as_u64()); info!(log, "Processing slot: {}", current_slot.as_u64());
// check for new duties /* check for new duties */
let mut cloned_manager = manager.clone();
let cloned_manager = manager.clone();
tokio::spawn(futures::future::poll_fn(move || { tokio::spawn(futures::future::poll_fn(move || {
cloned_manager.run_update(current_epoch.clone(), log.clone()) cloned_manager.run_update(current_epoch.clone(), log.clone())
})); }));
// execute any specified duties /* execute any specified duties */
if let Some(work) = manager.get_current_work(current_slot) {
for (_public_key, work_type) in work {
if work_type.produce_block {
// TODO: Produce a beacon block in a new thread
}
if work_type.produce_attestation {
//TODO: Produce an attestation in a new thread
}
}
}
Ok(()) Ok(())
})) }))