diff --git a/validator_client/src/block_producer/grpc.rs b/validator_client/src/block_producer/grpc.rs new file mode 100644 index 000000000..4032d49f3 --- /dev/null +++ b/validator_client/src/block_producer/grpc.rs @@ -0,0 +1,63 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use protos::services::{ + BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest, +}; +use protos::services_grpc::BeaconBlockServiceClient; +use ssz::{ssz_encode, Decodable}; +use types::{BeaconBlock, BeaconBlockBody, Hash256, Signature}; + +impl BeaconNode for BeaconBlockServiceClient { + fn produce_beacon_block(&self, slot: u64) -> Result, BeaconNodeError> { + let mut req = ProduceBeaconBlockRequest::new(); + req.set_slot(slot); + + let reply = self + .produce_beacon_block(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + if reply.has_block() { + let block = reply.get_block(); + + let (signature, _) = Signature::ssz_decode(block.get_signature(), 0) + .map_err(|_| BeaconNodeError::DecodeFailure)?; + + // TODO: this conversion is incomplete; fix it. + Ok(Some(BeaconBlock { + slot: block.get_slot(), + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + randao_reveal: Hash256::from(block.get_randao_reveal()), + candidate_pow_receipt_root: Hash256::zero(), + signature, + body: BeaconBlockBody { + proposer_slashings: vec![], + casper_slashings: vec![], + attestations: vec![], + deposits: vec![], + exits: vec![], + }, + })) + } else { + Ok(None) + } + } + + fn publish_beacon_block(&self, block: BeaconBlock) -> Result { + let mut req = PublishBeaconBlockRequest::new(); + + // TODO: this conversion is incomplete; fix it. + let mut grpc_block = GrpcBeaconBlock::new(); + grpc_block.set_slot(block.slot); + grpc_block.set_block_root(vec![0]); + grpc_block.set_randao_reveal(block.randao_reveal.to_vec()); + grpc_block.set_signature(ssz_encode(&block.signature)); + + req.set_block(grpc_block); + + let reply = self + .publish_beacon_block(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + Ok(reply.get_success()) + } +} diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs index 4d755979b..80896d873 100644 --- a/validator_client/src/block_producer/mod.rs +++ b/validator_client/src/block_producer/mod.rs @@ -1,3 +1,5 @@ +mod grpc; +mod test_node; mod traits; use self::traits::{BeaconNode, BeaconNodeError}; @@ -8,6 +10,8 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use types::BeaconBlock; +type EpochMap = HashMap; + #[derive(Debug, PartialEq)] pub enum PollOutcome { BlockProduced, @@ -24,27 +28,28 @@ pub enum PollError { SlotUnknowable, EpochMapPoisoned, SlotClockPoisoned, + EpochLengthIsZero, BeaconNodeError(BeaconNodeError), } pub struct BlockProducer { pub last_processed_slot: u64, - _spec: Arc, + spec: Arc, epoch_map: Arc>>, slot_clock: Arc>, - beacon_node: U, + beacon_node: Arc, } impl BlockProducer { pub fn new( spec: Arc, - epoch_map: Arc>>, + epoch_map: Arc>, slot_clock: Arc>, - beacon_node: U, + beacon_node: Arc, ) -> Self { Self { last_processed_slot: 0, - _spec: spec, + spec, epoch_map, slot_clock, beacon_node, @@ -65,6 +70,9 @@ impl BlockProducer { .map_err(|_| PollError::SlotClockError)? .ok_or(PollError::SlotUnknowable)?; + let epoch = slot.checked_div(self.spec.epoch_length) + .ok_or(PollError::EpochLengthIsZero)?; + // If this is a new slot. if slot > self.last_processed_slot { let is_block_production_slot = { @@ -72,9 +80,9 @@ impl BlockProducer { .epoch_map .read() .map_err(|_| PollError::EpochMapPoisoned)?; - match epoch_map.get(&slot) { + match epoch_map.get(&epoch) { None => return Ok(PollOutcome::ProducerDutiesUnknown), - Some(duties) => duties.is_block_production_slot(slot) + Some(duties) => duties.is_block_production_slot(slot), } }; @@ -125,3 +133,66 @@ impl From for PollError { PollError::BeaconNodeError(e) } } + +#[cfg(test)] +mod tests { + use super::test_node::TestBeaconNode; + use super::*; + use slot_clock::TestingSlotClock; + use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; + + // TODO: implement more thorough testing. + // + // These tests should serve as a good example for future tests. + + #[test] + pub fn polling() { + let mut rng = XorShiftRng::from_seed([42; 16]); + + let spec = Arc::new(ChainSpec::foundation()); + let epoch_map = Arc::new(RwLock::new(EpochMap::new())); + let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); + let beacon_node = Arc::new(TestBeaconNode::default()); + + let mut block_producer = BlockProducer::new( + spec.clone(), + epoch_map.clone(), + slot_clock.clone(), + beacon_node.clone(), + ); + + // Configure responses from the BeaconNode. + beacon_node.set_next_produce_result(Ok(Some(BeaconBlock::random_for_test(&mut rng)))); + beacon_node.set_next_publish_result(Ok(true)); + + + // Setup some valid duties for the validator + let produce_slot = 100; + let duties = EpochDuties { + block_production_slot: Some(produce_slot), + ..std::default::Default::default() + }; + let produce_epoch = produce_slot / spec.epoch_length; + epoch_map.write().unwrap().insert(produce_epoch, duties); + + // One slot before production slot... + slot_clock.write().unwrap().set_slot(produce_slot - 1); + assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProductionNotRequired)); + + // On the produce slot... + slot_clock.write().unwrap().set_slot(produce_slot); + assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProduced)); + + // Trying the same produce slot again... + slot_clock.write().unwrap().set_slot(produce_slot); + assert_eq!(block_producer.poll(), Ok(PollOutcome::SlotAlreadyProcessed)); + + // One slot after the produce slot... + slot_clock.write().unwrap().set_slot(produce_slot + 1); + assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProductionNotRequired)); + + // In an epoch without known duties... + slot_clock.write().unwrap().set_slot((produce_epoch + 1) * spec.epoch_length); + assert_eq!(block_producer.poll(), Ok(PollOutcome::ProducerDutiesUnknown)); + } +} diff --git a/validator_client/src/block_producer/test_node.rs b/validator_client/src/block_producer/test_node.rs new file mode 100644 index 000000000..fdb179b5c --- /dev/null +++ b/validator_client/src/block_producer/test_node.rs @@ -0,0 +1,42 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use types::BeaconBlock; +use std::sync::RwLock; + +type ProduceResult = Result, BeaconNodeError>; +type PublishResult = Result; + +#[derive(Default)] +pub struct TestBeaconNode { + pub produce_input: RwLock>, + pub produce_result: RwLock>, + pub publish_input: RwLock>, + pub publish_result: RwLock>, +} + +impl TestBeaconNode { + pub fn set_next_produce_result(&self, result: ProduceResult) { + *self.produce_result.write().unwrap() = Some(result); + } + + pub fn set_next_publish_result(&self, result: PublishResult) { + *self.publish_result.write().unwrap() = Some(result); + } +} + +impl BeaconNode for TestBeaconNode { + fn produce_beacon_block(&self, slot: u64) -> ProduceResult { + *self.produce_input.write().unwrap() = Some(slot); + match *self.produce_result.read().unwrap() { + Some(ref r) => r.clone(), + None => panic!("TestBeaconNode: produce_result == None") + } + } + + fn publish_beacon_block(&self, block: BeaconBlock) -> PublishResult { + *self.publish_input.write().unwrap() = Some(block); + match *self.publish_result.read().unwrap() { + Some(ref r) => r.clone(), + None => panic!("TestBeaconNode: publish_result == None") + } + } +} diff --git a/validator_client/src/block_producer/traits.rs b/validator_client/src/block_producer/traits.rs index 42ceb8c4a..4c92b92dd 100644 --- a/validator_client/src/block_producer/traits.rs +++ b/validator_client/src/block_producer/traits.rs @@ -1,11 +1,6 @@ -use protos::services::{ - BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest, -}; -use protos::services_grpc::BeaconBlockServiceClient; -use ssz::{ssz_encode, Decodable}; -use types::{BeaconBlock, BeaconBlockBody, Hash256, Signature}; +use types::BeaconBlock; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum BeaconNodeError { RemoteFailure(String), DecodeFailure, @@ -15,59 +10,3 @@ pub trait BeaconNode { fn produce_beacon_block(&self, slot: u64) -> Result, BeaconNodeError>; fn publish_beacon_block(&self, block: BeaconBlock) -> Result; } - -impl BeaconNode for BeaconBlockServiceClient { - fn produce_beacon_block(&self, slot: u64) -> Result, BeaconNodeError> { - let mut req = ProduceBeaconBlockRequest::new(); - req.set_slot(slot); - - let reply = self - .produce_beacon_block(&req) - .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; - - if reply.has_block() { - let block = reply.get_block(); - - let (signature, _) = Signature::ssz_decode(block.get_signature(), 0) - .map_err(|_| BeaconNodeError::DecodeFailure)?; - - // TODO: this conversion is incomplete; fix it. - Ok(Some(BeaconBlock { - slot: block.get_slot(), - parent_root: Hash256::zero(), - state_root: Hash256::zero(), - randao_reveal: Hash256::from(block.get_randao_reveal()), - candidate_pow_receipt_root: Hash256::zero(), - signature, - body: BeaconBlockBody { - proposer_slashings: vec![], - casper_slashings: vec![], - attestations: vec![], - deposits: vec![], - exits: vec![], - }, - })) - } else { - Ok(None) - } - } - - fn publish_beacon_block(&self, block: BeaconBlock) -> Result { - let mut req = PublishBeaconBlockRequest::new(); - - // TODO: this conversion is incomplete; fix it. - let mut grpc_block = GrpcBeaconBlock::new(); - grpc_block.set_slot(block.slot); - grpc_block.set_block_root(vec![0]); - grpc_block.set_randao_reveal(block.randao_reveal.to_vec()); - grpc_block.set_signature(ssz_encode(&block.signature)); - - req.set_block(grpc_block); - - let reply = self - .publish_beacon_block(&req) - .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; - - Ok(reply.get_success()) - } -} diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 092f51285..51f4de6c2 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -23,7 +23,7 @@ fn main() { // gRPC let env = Arc::new(EnvBuilder::new().build()); let ch = ChannelBuilder::new(env).connect("localhost:50051"); - let client = BeaconBlockServiceClient::new(ch); + let client = Arc::new(BeaconBlockServiceClient::new(ch)); // Logging let decorator = slog_term::TermDecorator::new().build(); @@ -83,6 +83,7 @@ fn main() { tokio::run(task); } +#[derive(Debug, PartialEq, Clone, Copy, Default)] pub struct EpochDuties { block_production_slot: Option, shard: Option,