From ffb3d943556935d50b9815596e876296ec1c1283 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 26 Mar 2019 11:59:48 +1100 Subject: [PATCH] Wrap the duty manager in a future for its own thread --- validator_client/Cargo.toml | 1 + validator_client/src/duties/mod.rs | 25 +++++++++++++++++++++- validator_client/src/service.rs | 33 +++++++++--------------------- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index eace153fa..570e06d74 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -32,3 +32,4 @@ tokio = "0.1.18" tokio-timer = "0.2.10" error-chain = "0.12.0" bincode = "^1.1.2" +futures = "0.1.25" diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index fa53180a6..185f80d6d 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -8,6 +8,8 @@ pub use self::epoch_duties::EpochDutiesMap; use self::epoch_duties::{EpochDuties, EpochDutiesMapError}; use self::traits::{BeaconNode, BeaconNodeError}; use bls::PublicKey; +use futures::Async; +use slog::{debug, error, info}; use slot_clock::SlotClock; use std::sync::Arc; use types::{ChainSpec, Epoch, Slot}; @@ -52,7 +54,7 @@ impl DutiesManager { /// /// The present `epoch` will be learned from the supplied `SlotClock`. In production this will /// be a wall-clock (e.g., system time, remote server time, etc.). - pub fn update(&self, slot: Slot) -> Result { + fn update(&self, slot: Slot) -> Result { let epoch = slot.epoch(self.spec.slots_per_epoch); if let Some(duties) = self @@ -75,6 +77,27 @@ impl DutiesManager { Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) } } + + /// A future wrapping around `update()`. This will perform logic based upon the update + /// process and complete once the update has completed. + pub fn run_update(&self, slot: Slot, log: slog::Logger) -> Result, ()> { + match self.update(slot) { + Err(error) => error!(log, "Epoch duties poll error"; "error" => format!("{:?}", error)), + Ok(UpdateOutcome::NoChange(epoch)) => { + debug!(log, "No change in duties"; "epoch" => epoch) + } + Ok(UpdateOutcome::DutiesChanged(epoch, duties)) => { + info!(log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + } + Ok(UpdateOutcome::NewDuties(epoch, duties)) => { + info!(log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + } + Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) => { + error!(log, "Epoch or validator unknown"; "epoch" => epoch) + } + }; + Ok(Async::Ready(())) + } } impl From for Error { diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 8a7e90d10..9ca591b92 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -222,21 +222,22 @@ impl Service { // build requisite objects to pass to core thread. let duties_map = Arc::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); let epoch_map_for_attester = Arc::new(EpochMap::new(config.spec.slots_per_epoch)); - let manager = DutiesManager { + let manager = Arc::new(DutiesManager { duties_map, pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(), spec: Arc::new(config.spec), slot_clock: service.slot_clock.clone(), beacon_node: service.validator_client.clone(), - }; + }); // run the core thread runtime .block_on(interval.for_each(move |_| { + let log = service.log.clone(); // get the current slot let current_slot = match service.slot_clock.present_slot() { Err(e) => { - error!(service.log, "SystemTimeError {:?}", e); + error!(log, "SystemTimeError {:?}", e); return Ok(()); } Ok(slot) => slot.expect("Genesis is in the future"), @@ -247,28 +248,14 @@ impl Service { "The Timer should poll a new slot" ); - info!(service.log, "Processing slot: {}", current_slot.as_u64()); + info!(log, "Processing slot: {}", current_slot.as_u64()); + + let cloned_manager = manager.clone(); // check for new duties - // TODO: Convert to its own thread - match manager.update(current_slot) { - Err(error) => { - error!(service.log, "Epoch duties poll error"; "error" => format!("{:?}", error)) - } - Ok(UpdateOutcome::NoChange(epoch)) => { - debug!(service.log, "No change in duties"; "epoch" => epoch) - } - Ok(UpdateOutcome::DutiesChanged(epoch, duties)) => { - info!(service.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties)) - } - Ok(UpdateOutcome::NewDuties(epoch, duties)) => { - info!(service.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) - } - Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) => { - error!(service.log, "Epoch or validator unknown"; "epoch" => epoch) - } - }; - + tokio::spawn(futures::future::poll_fn(move || { + cloned_manager.run_update(current_slot.clone(), log.clone()) + })); Ok(()) })) .map_err(|e| format!("Service thread failed: {:?}", e))?;