Wrap the duty manager in a future for its own thread

This commit is contained in:
Age Manning 2019-03-26 11:59:48 +11:00
parent 33d0f29221
commit ffb3d94355
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
3 changed files with 35 additions and 24 deletions

View File

@ -32,3 +32,4 @@ tokio = "0.1.18"
tokio-timer = "0.2.10"
error-chain = "0.12.0"
bincode = "^1.1.2"
futures = "0.1.25"

View File

@ -8,6 +8,8 @@ pub use self::epoch_duties::EpochDutiesMap;
use self::epoch_duties::{EpochDuties, EpochDutiesMapError};
use self::traits::{BeaconNode, BeaconNodeError};
use bls::PublicKey;
use futures::Async;
use slog::{debug, error, info};
use slot_clock::SlotClock;
use std::sync::Arc;
use types::{ChainSpec, Epoch, Slot};
@ -52,7 +54,7 @@ impl<T: SlotClock, U: BeaconNode> DutiesManager<T, U> {
///
/// The present `epoch` will be learned from the supplied `SlotClock`. In production this will
/// be a wall-clock (e.g., system time, remote server time, etc.).
pub fn update(&self, slot: Slot) -> Result<UpdateOutcome, Error> {
fn update(&self, slot: Slot) -> Result<UpdateOutcome, Error> {
let epoch = slot.epoch(self.spec.slots_per_epoch);
if let Some(duties) = self
@ -75,6 +77,27 @@ impl<T: SlotClock, U: BeaconNode> DutiesManager<T, U> {
Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch))
}
}
/// A future wrapping around `update()`. This will perform logic based upon the update
/// process and complete once the update has completed.
pub fn run_update(&self, slot: Slot, log: slog::Logger) -> Result<Async<()>, ()> {
match self.update(slot) {
Err(error) => error!(log, "Epoch duties poll error"; "error" => format!("{:?}", error)),
Ok(UpdateOutcome::NoChange(epoch)) => {
debug!(log, "No change in duties"; "epoch" => epoch)
}
Ok(UpdateOutcome::DutiesChanged(epoch, duties)) => {
info!(log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(UpdateOutcome::NewDuties(epoch, duties)) => {
info!(log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) => {
error!(log, "Epoch or validator unknown"; "epoch" => epoch)
}
};
Ok(Async::Ready(()))
}
}
impl From<BeaconNodeError> for Error {

View File

@ -222,21 +222,22 @@ impl Service {
// build requisite objects to pass to core thread.
let duties_map = Arc::new(EpochDutiesMap::new(config.spec.slots_per_epoch));
let epoch_map_for_attester = Arc::new(EpochMap::new(config.spec.slots_per_epoch));
let manager = DutiesManager {
let manager = Arc::new(DutiesManager {
duties_map,
pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(),
spec: Arc::new(config.spec),
slot_clock: service.slot_clock.clone(),
beacon_node: service.validator_client.clone(),
};
});
// run the core thread
runtime
.block_on(interval.for_each(move |_| {
let log = service.log.clone();
// get the current slot
let current_slot = match service.slot_clock.present_slot() {
Err(e) => {
error!(service.log, "SystemTimeError {:?}", e);
error!(log, "SystemTimeError {:?}", e);
return Ok(());
}
Ok(slot) => slot.expect("Genesis is in the future"),
@ -247,28 +248,14 @@ impl Service {
"The Timer should poll a new slot"
);
info!(service.log, "Processing slot: {}", current_slot.as_u64());
info!(log, "Processing slot: {}", current_slot.as_u64());
let cloned_manager = manager.clone();
// check for new duties
// TODO: Convert to its own thread
match manager.update(current_slot) {
Err(error) => {
error!(service.log, "Epoch duties poll error"; "error" => format!("{:?}", error))
}
Ok(UpdateOutcome::NoChange(epoch)) => {
debug!(service.log, "No change in duties"; "epoch" => epoch)
}
Ok(UpdateOutcome::DutiesChanged(epoch, duties)) => {
info!(service.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(UpdateOutcome::NewDuties(epoch, duties)) => {
info!(service.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) => {
error!(service.log, "Epoch or validator unknown"; "epoch" => epoch)
}
};
tokio::spawn(futures::future::poll_fn(move || {
cloned_manager.run_update(current_slot.clone(), log.clone())
}));
Ok(())
}))
.map_err(|e| format!("Service thread failed: {:?}", e))?;