From 3891a6017a0e9f4410e33d8e0036229bbacc2392 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 15 Jan 2019 17:42:55 +1100 Subject: [PATCH] Impl more validator client --- validator_client/Cargo.toml | 1 - validator_client/src/block_producer/mod.rs | 84 +++++++++++-------- .../src/block_producer/service.rs | 40 +++++++++ validator_client/src/block_producer/traits.rs | 2 +- validator_client/src/main.rs | 60 ++++--------- 5 files changed, 105 insertions(+), 82 deletions(-) create mode 100644 validator_client/src/block_producer/service.rs diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index ce74639d0..77f57d601 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -10,7 +10,6 @@ protobuf = "2.0.2" protos = { path = "../protos" } slot_clock = { path = "../beacon_chain/utils/slot_clock" } spec = { path = "../beacon_chain/spec" } -tokio = "0.1.14" types = { path = "../beacon_chain/types" } slog = "^2.2.3" slog-term = "^2.4.0" diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs index 80896d873..5987f3a24 100644 --- a/validator_client/src/block_producer/mod.rs +++ b/validator_client/src/block_producer/mod.rs @@ -1,4 +1,5 @@ mod grpc; +mod service; mod test_node; mod traits; @@ -10,20 +11,22 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use types::BeaconBlock; +pub use self::service::BlockProducerService; + type EpochMap = HashMap; #[derive(Debug, PartialEq)] pub enum PollOutcome { - BlockProduced, - SlashableBlockNotProduced, - BlockProductionNotRequired, - ProducerDutiesUnknown, - SlotAlreadyProcessed, - BeaconNodeUnableToProduceBlock, + BlockProduced(u64), + SlashableBlockNotProduced(u64), + BlockProductionNotRequired(u64), + ProducerDutiesUnknown(u64), + SlotAlreadyProcessed(u64), + BeaconNodeUnableToProduceBlock(u64), } #[derive(Debug, PartialEq)] -pub enum PollError { +pub enum Error { SlotClockError, SlotUnknowable, EpochMapPoisoned, @@ -61,27 +64,25 @@ impl BlockProducer { /// "Poll" to see if the validator is required to take any action. /// /// The slot clock will be read and any new actions undertaken. - pub fn poll(&mut self) -> Result { + pub fn poll(&mut self) -> Result { let slot = self .slot_clock .read() - .map_err(|_| PollError::SlotClockPoisoned)? + .map_err(|_| Error::SlotClockPoisoned)? .present_slot() - .map_err(|_| PollError::SlotClockError)? - .ok_or(PollError::SlotUnknowable)?; + .map_err(|_| Error::SlotClockError)? + .ok_or(Error::SlotUnknowable)?; - let epoch = slot.checked_div(self.spec.epoch_length) - .ok_or(PollError::EpochLengthIsZero)?; + let epoch = slot + .checked_div(self.spec.epoch_length) + .ok_or(Error::EpochLengthIsZero)?; // If this is a new slot. if slot > self.last_processed_slot { let is_block_production_slot = { - let epoch_map = self - .epoch_map - .read() - .map_err(|_| PollError::EpochMapPoisoned)?; + let epoch_map = self.epoch_map.read().map_err(|_| Error::EpochMapPoisoned)?; match epoch_map.get(&epoch) { - None => return Ok(PollOutcome::ProducerDutiesUnknown), + None => return Ok(PollOutcome::ProducerDutiesUnknown(slot)), Some(duties) => duties.is_block_production_slot(slot), } }; @@ -91,24 +92,24 @@ impl BlockProducer { self.produce_block(slot) } else { - Ok(PollOutcome::BlockProductionNotRequired) + Ok(PollOutcome::BlockProductionNotRequired(slot)) } } else { - Ok(PollOutcome::SlotAlreadyProcessed) + Ok(PollOutcome::SlotAlreadyProcessed(slot)) } } - fn produce_block(&mut self, slot: u64) -> Result { + fn produce_block(&mut self, slot: u64) -> Result { if let Some(block) = self.beacon_node.produce_beacon_block(slot)? { if self.safe_to_produce(&block) { let block = self.sign_block(block); self.beacon_node.publish_beacon_block(block)?; - Ok(PollOutcome::BlockProduced) + Ok(PollOutcome::BlockProduced(slot)) } else { - Ok(PollOutcome::SlashableBlockNotProduced) + Ok(PollOutcome::SlashableBlockNotProduced(slot)) } } else { - Ok(PollOutcome::BeaconNodeUnableToProduceBlock) + Ok(PollOutcome::BeaconNodeUnableToProduceBlock(slot)) } } @@ -128,9 +129,9 @@ impl BlockProducer { } } -impl From for PollError { - fn from(e: BeaconNodeError) -> PollError { - PollError::BeaconNodeError(e) +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) } } @@ -165,7 +166,6 @@ mod tests { beacon_node.set_next_produce_result(Ok(Some(BeaconBlock::random_for_test(&mut rng)))); beacon_node.set_next_publish_result(Ok(true)); - // Setup some valid duties for the validator let produce_slot = 100; let duties = EpochDuties { @@ -177,22 +177,38 @@ mod tests { // One slot before production slot... slot_clock.write().unwrap().set_slot(produce_slot - 1); - assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProductionNotRequired)); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::BlockProductionNotRequired(produce_slot - 1)) + ); // On the produce slot... slot_clock.write().unwrap().set_slot(produce_slot); - assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProduced)); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::BlockProduced(produce_slot)) + ); // Trying the same produce slot again... slot_clock.write().unwrap().set_slot(produce_slot); - assert_eq!(block_producer.poll(), Ok(PollOutcome::SlotAlreadyProcessed)); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::SlotAlreadyProcessed(produce_slot)) + ); // One slot after the produce slot... slot_clock.write().unwrap().set_slot(produce_slot + 1); - assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProductionNotRequired)); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::BlockProductionNotRequired(produce_slot + 1)) + ); // In an epoch without known duties... - slot_clock.write().unwrap().set_slot((produce_epoch + 1) * spec.epoch_length); - assert_eq!(block_producer.poll(), Ok(PollOutcome::ProducerDutiesUnknown)); + let slot = (produce_epoch + 1) * spec.epoch_length; + slot_clock.write().unwrap().set_slot(slot); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::ProducerDutiesUnknown(slot)) + ); } } diff --git a/validator_client/src/block_producer/service.rs b/validator_client/src/block_producer/service.rs new file mode 100644 index 000000000..aef1e3c28 --- /dev/null +++ b/validator_client/src/block_producer/service.rs @@ -0,0 +1,40 @@ +use super::traits::BeaconNode; +use super::{BlockProducer, PollOutcome as BlockProducerPollOutcome, SlotClock}; +use slog::{error, info, warn, Logger}; +use std::time::Duration; + +pub struct BlockProducerService { + pub block_producer: BlockProducer, + pub poll_interval_millis: u64, + pub log: Logger, +} + +impl BlockProducerService { + pub fn run(&mut self) { + loop { + match self.block_producer.poll() { + Err(error) => { + error!(self.log, "Block producer poll error"; "error" => format!("{:?}", error)) + } + Ok(BlockProducerPollOutcome::BlockProduced(slot)) => info!(self.log, "Produced block"; "slot" => slot), + Ok(BlockProducerPollOutcome::SlashableBlockNotProduced(slot)) => { + warn!(self.log, "Slashable block was not signed"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::BlockProductionNotRequired(slot)) => { + info!(self.log, "Block production not required"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::ProducerDutiesUnknown(slot)) => { + error!(self.log, "Block production duties unknown"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::SlotAlreadyProcessed(slot)) => { + warn!(self.log, "Attempted to re-process slot"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock(slot)) => { + error!(self.log, "Beacon node unable to produce block"; "slot" => slot) + } + }; + + std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); + } + } +} diff --git a/validator_client/src/block_producer/traits.rs b/validator_client/src/block_producer/traits.rs index 4c92b92dd..aaae031ec 100644 --- a/validator_client/src/block_producer/traits.rs +++ b/validator_client/src/block_producer/traits.rs @@ -6,7 +6,7 @@ pub enum BeaconNodeError { DecodeFailure, } -pub trait BeaconNode { +pub trait BeaconNode: Send + Sync { fn produce_beacon_block(&self, slot: u64) -> Result, BeaconNodeError>; fn publish_beacon_block(&self, block: BeaconBlock) -> Result; } diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 51f4de6c2..53c289d14 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,23 +1,13 @@ -mod block_producer; - -use spec::ChainSpec; -use tokio::prelude::*; -use tokio::timer::Interval; - -use crate::block_producer::{BlockProducer, PollOutcome as BlockProducerPollOutcome}; - -use std::time::{Duration, Instant}; - -use std::sync::{Arc, RwLock}; - -use std::collections::HashMap; - -use slot_clock::SystemTimeSlotClock; - +use crate::block_producer::{BlockProducer, BlockProducerService}; use grpcio::{ChannelBuilder, EnvBuilder}; use protos::services_grpc::BeaconBlockServiceClient; +use slog::{info, o, Drain}; +use slot_clock::SystemTimeSlotClock; +use spec::ChainSpec; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; -use slog::{error, info, o, warn, Drain}; +mod block_producer; fn main() { // gRPC @@ -47,40 +37,18 @@ fn main() { Arc::new(RwLock::new(clock)) }; - let mut block_producer = + let block_producer = BlockProducer::new(spec.clone(), epoch_map.clone(), slot_clock.clone(), client); info!(log, "Slot duration"; "milliseconds" => duration); - let task = Interval::new(Instant::now(), Duration::from_millis(duration)) - // .take(10) - .for_each(move |_instant| { - match block_producer.poll() { - Err(error) => { - error!(log, "Block producer poll error"; "error" => format!("{:?}", error)) - } - Ok(BlockProducerPollOutcome::BlockProduced) => info!(log, "Produced block"), - Ok(BlockProducerPollOutcome::SlashableBlockNotProduced) => { - warn!(log, "Slashable block was not signed") - } - Ok(BlockProducerPollOutcome::BlockProductionNotRequired) => { - info!(log, "Block production not required") - } - Ok(BlockProducerPollOutcome::ProducerDutiesUnknown) => { - error!(log, "Block production duties unknown") - } - Ok(BlockProducerPollOutcome::SlotAlreadyProcessed) => { - warn!(log, "Attempted to re-process slot") - } - Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock) => { - error!(log, "Beacon node unable to produce block") - } - }; - Ok(()) - }) - .map_err(|e| panic!("Block producer interval errored; err={:?}", e)); + let mut block_producer_service = BlockProducerService { + block_producer, + poll_interval_millis: spec.epoch_length * 1000 / 100, // 1% epoch time precision. + log: log.clone(), + }; - tokio::run(task); + block_producer_service.run(); } #[derive(Debug, PartialEq, Clone, Copy, Default)]