From ebba4977a8a647761b6462533dc752ed2099e104 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 16 Jan 2019 21:44:44 +1100 Subject: [PATCH] Introduce threading to validator client --- validator_client/src/block_producer/mod.rs | 10 ++-- validator_client/src/duties/grpc.rs | 63 ++++++++++++++++++++++ validator_client/src/duties/mod.rs | 19 ++++--- validator_client/src/duties/service.rs | 8 +-- validator_client/src/main.rs | 48 ++++++++++++----- 5 files changed, 116 insertions(+), 32 deletions(-) create mode 100644 validator_client/src/duties/grpc.rs diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs index af12010da..2efcbd6ef 100644 --- a/validator_client/src/block_producer/mod.rs +++ b/validator_client/src/block_producer/mod.rs @@ -4,17 +4,15 @@ mod test_node; mod traits; use self::traits::{BeaconNode, BeaconNodeError}; +use super::EpochDutiesMap; use crate::duties::EpochDuties; use slot_clock::SlotClock; use spec::ChainSpec; -use std::collections::HashMap; use std::sync::{Arc, RwLock}; use types::BeaconBlock; pub use self::service::BlockProducerService; -type EpochMap = HashMap; - #[derive(Debug, PartialEq)] pub enum PollOutcome { BlockProduced(u64), @@ -38,7 +36,7 @@ pub enum Error { pub struct BlockProducer { pub last_processed_slot: u64, spec: Arc, - epoch_map: Arc>>, + epoch_map: Arc>, slot_clock: Arc>, beacon_node: Arc, } @@ -46,7 +44,7 @@ pub struct BlockProducer { impl BlockProducer { pub fn new( spec: Arc, - epoch_map: Arc>, + epoch_map: Arc>, slot_clock: Arc>, beacon_node: Arc, ) -> Self { @@ -151,7 +149,7 @@ mod tests { let mut rng = XorShiftRng::from_seed([42; 16]); let spec = Arc::new(ChainSpec::foundation()); - let epoch_map = Arc::new(RwLock::new(EpochMap::new())); + let epoch_map = Arc::new(RwLock::new(EpochDutiesMap::new())); let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); let beacon_node = Arc::new(TestBeaconNode::default()); diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs new file mode 100644 index 000000000..4032d49f3 --- /dev/null +++ b/validator_client/src/duties/grpc.rs @@ -0,0 +1,63 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use protos::services::{ + BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest, +}; +use protos::services_grpc::BeaconBlockServiceClient; +use ssz::{ssz_encode, Decodable}; +use types::{BeaconBlock, BeaconBlockBody, Hash256, Signature}; + +impl BeaconNode for BeaconBlockServiceClient { + fn produce_beacon_block(&self, slot: u64) -> Result, BeaconNodeError> { + let mut req = ProduceBeaconBlockRequest::new(); + req.set_slot(slot); + + let reply = self + .produce_beacon_block(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + if reply.has_block() { + let block = reply.get_block(); + + let (signature, _) = Signature::ssz_decode(block.get_signature(), 0) + .map_err(|_| BeaconNodeError::DecodeFailure)?; + + // TODO: this conversion is incomplete; fix it. + Ok(Some(BeaconBlock { + slot: block.get_slot(), + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + randao_reveal: Hash256::from(block.get_randao_reveal()), + candidate_pow_receipt_root: Hash256::zero(), + signature, + body: BeaconBlockBody { + proposer_slashings: vec![], + casper_slashings: vec![], + attestations: vec![], + deposits: vec![], + exits: vec![], + }, + })) + } else { + Ok(None) + } + } + + fn publish_beacon_block(&self, block: BeaconBlock) -> Result { + let mut req = PublishBeaconBlockRequest::new(); + + // TODO: this conversion is incomplete; fix it. + let mut grpc_block = GrpcBeaconBlock::new(); + grpc_block.set_slot(block.slot); + grpc_block.set_block_root(vec![0]); + grpc_block.set_randao_reveal(block.randao_reveal.to_vec()); + grpc_block.set_signature(ssz_encode(&block.signature)); + + req.set_block(grpc_block); + + let reply = self + .publish_beacon_block(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + Ok(reply.get_success()) + } +} diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index f3c25f221..d401d350e 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -9,10 +9,12 @@ mod service; mod test_node; mod traits; +pub use self::service::DutiesManagerService; + #[derive(Debug, PartialEq, Clone, Copy, Default)] pub struct EpochDuties { pub block_production_slot: Option, - pub shard: Option, + // Future shard info } impl EpochDuties { @@ -24,7 +26,7 @@ impl EpochDuties { } } -type EpochDutiesMap = HashMap<(PublicKey, u64), EpochDuties>; +pub type EpochDutiesMap = HashMap; #[derive(Debug, PartialEq, Clone, Copy)] pub enum PollOutcome { @@ -73,7 +75,7 @@ impl DutiesManager { .map_err(|_| Error::EpochMapPoisoned)?; // If these duties were known, check to see if they're updates or identical. - let result = if let Some(known_duties) = map.get(&(self.pubkey.clone(), epoch)) { + let result = if let Some(known_duties) = map.get(&epoch) { if *known_duties == duties { Ok(PollOutcome::NoChange) } else { @@ -82,13 +84,12 @@ impl DutiesManager { } else { Ok(PollOutcome::NewDuties) }; - map.insert((self.pubkey.clone(), epoch), duties); + map.insert(epoch, duties); result } else { Ok(PollOutcome::UnknownValidatorOrEpoch) } } - } impl From for Error { @@ -101,8 +102,8 @@ impl From for Error { mod tests { use super::test_node::TestBeaconNode; use super::*; - use slot_clock::TestingSlotClock; use bls::Keypair; + use slot_clock::TestingSlotClock; // TODO: implement more thorough testing. // @@ -125,9 +126,8 @@ mod tests { }; // Configure response from the BeaconNode. - beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties{ + beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties { block_production_slot: Some(10), - shard: Some(12), }))); // Get the duties for the first time... @@ -136,9 +136,8 @@ mod tests { assert_eq!(manager.poll(), Ok(PollOutcome::NoChange)); // Return new duties. - beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties{ + beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties { block_production_slot: Some(11), - shard: Some(12), }))); assert_eq!(manager.poll(), Ok(PollOutcome::DutiesChanged)); diff --git a/validator_client/src/duties/service.rs b/validator_client/src/duties/service.rs index 10de53634..bedfd69ca 100644 --- a/validator_client/src/duties/service.rs +++ b/validator_client/src/duties/service.rs @@ -1,16 +1,16 @@ -use super::traits::{BeaconNode, BeaconNodeError}; +use super::traits::BeaconNode; use super::{DutiesManager, PollOutcome}; -use slog::{debug, error, info, warn, Logger}; +use slog::{debug, error, info, Logger}; use slot_clock::SlotClock; use std::time::Duration; -pub struct DutiesService { +pub struct DutiesManagerService { pub manager: DutiesManager, pub poll_interval_millis: u64, pub log: Logger, } -impl DutiesService { +impl DutiesManagerService { pub fn run(&mut self) { loop { match self.manager.poll() { diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index a507b8b4b..179146897 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,14 +1,16 @@ +use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; use crate::block_producer::{BlockProducer, BlockProducerService}; use crate::config::ClientConfig; +use bls::Keypair; use clap::{App, Arg}; use grpcio::{ChannelBuilder, EnvBuilder}; use protos::services_grpc::BeaconBlockServiceClient; use slog::{error, info, o, Drain}; use slot_clock::SystemTimeSlotClock; use spec::ChainSpec; -use std::collections::HashMap; use std::path::PathBuf; use std::sync::{Arc, RwLock}; +use std::thread; mod block_producer; mod config; @@ -74,8 +76,7 @@ fn main() { // TODO: Permit loading a custom spec from file. let spec = Arc::new(ChainSpec::foundation()); - // Global map of epoch -> validator duties. - let epoch_map = Arc::new(RwLock::new(HashMap::new())); + // Clock for determining the present slot. let slot_clock = { info!(log, "Genesis time"; "unix_epoch_seconds" => spec.genesis_time); let clock = SystemTimeSlotClock::new(spec.genesis_time, spec.slot_duration) @@ -83,17 +84,40 @@ fn main() { Arc::new(RwLock::new(clock)) }; - let block_producer = - BlockProducer::new(spec.clone(), epoch_map.clone(), slot_clock.clone(), client); - let poll_interval_millis = spec.slot_duration * 1000 / 10; // 10% epoch time precision. info!(log, "Starting block producer service"; "polls_per_epoch" => spec.slot_duration * 1000 / poll_interval_millis); - let mut block_producer_service = BlockProducerService { - block_producer, - poll_interval_millis, - log: log.clone(), - }; + /* + * Start threads. + */ + let keypairs = vec![Keypair::random()]; + let mut threads = vec![]; - block_producer_service.run(); + for keypair in keypairs { + let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + + let producer_thread = { + let spec = spec.clone(); + let duties_map = duties_map.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let client = client.clone(); + thread::spawn(move || { + let block_producer = BlockProducer::new(spec, duties_map, slot_clock, client); + let mut block_producer_service = BlockProducerService { + block_producer, + poll_interval_millis, + log, + }; + + block_producer_service.run(); + }) + }; + threads.push(((), producer_thread)); + } + + for tuple in threads { + let (manager, producer) = tuple; + let _ = producer.join(); + } }