Replace EpochDutiesMap with trait in BlockProducer

This commit is contained in:
Paul Hauner 2019-01-23 21:01:46 +11:00
parent af6437eb13
commit 8e935f93bc
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
6 changed files with 92 additions and 34 deletions

View File

@ -63,7 +63,7 @@ impl BeaconNode for BeaconBlockServiceClient {
let mut grpc_block = GrpcBeaconBlock::new(); let mut grpc_block = GrpcBeaconBlock::new();
grpc_block.set_slot(block.slot); grpc_block.set_slot(block.slot);
grpc_block.set_block_root(vec![0]); grpc_block.set_block_root(vec![0]);
grpc_block.set_randao_reveal(block.randao_reveal.to_vec()); grpc_block.set_randao_reveal(ssz_encode(&block.randao_reveal));
grpc_block.set_signature(ssz_encode(&block.signature)); grpc_block.set_signature(ssz_encode(&block.signature));
req.set_block(grpc_block); req.set_block(grpc_block);

View File

@ -2,10 +2,9 @@ mod grpc;
mod service; mod service;
#[cfg(test)] #[cfg(test)]
mod test_node; mod test_node;
mod traits; pub mod traits;
use self::traits::{BeaconNode, BeaconNodeError}; use self::traits::{BeaconNode, BeaconNodeError, DutiesReader, DutiesReaderError};
use super::EpochDutiesMap;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use spec::ChainSpec; use spec::ChainSpec;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -45,19 +44,19 @@ pub enum Error {
/// Ensures that messages are not slashable. /// Ensures that messages are not slashable.
/// ///
/// Relies upon an external service to keep the `EpochDutiesMap` updated. /// Relies upon an external service to keep the `EpochDutiesMap` updated.
pub struct BlockProducer<T: SlotClock, U: BeaconNode> { pub struct BlockProducer<T: SlotClock, U: BeaconNode, V: DutiesReader> {
pub last_processed_slot: u64, pub last_processed_slot: u64,
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
epoch_map: Arc<RwLock<EpochDutiesMap>>, epoch_map: Arc<V>,
slot_clock: Arc<RwLock<T>>, slot_clock: Arc<RwLock<T>>,
beacon_node: Arc<U>, beacon_node: Arc<U>,
} }
impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> { impl<T: SlotClock, U: BeaconNode, V: DutiesReader> BlockProducer<T, U, V> {
/// Returns a new instance where `last_processed_slot == 0`. /// Returns a new instance where `last_processed_slot == 0`.
pub fn new( pub fn new(
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
epoch_map: Arc<RwLock<EpochDutiesMap>>, epoch_map: Arc<V>,
slot_clock: Arc<RwLock<T>>, slot_clock: Arc<RwLock<T>>,
beacon_node: Arc<U>, beacon_node: Arc<U>,
) -> Self { ) -> Self {
@ -71,7 +70,7 @@ impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> {
} }
} }
impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> { impl<T: SlotClock, U: BeaconNode, V: DutiesReader> BlockProducer<T, U, V> {
/// "Poll" to see if the validator is required to take any action. /// "Poll" to see if the validator is required to take any action.
/// ///
/// The slot clock will be read and any new actions undertaken. /// The slot clock will be read and any new actions undertaken.
@ -90,13 +89,14 @@ impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> {
// If this is a new slot. // If this is a new slot.
if slot > self.last_processed_slot { if slot > self.last_processed_slot {
let is_block_production_slot = { let is_block_production_slot =
let epoch_map = self.epoch_map.read().map_err(|_| Error::EpochMapPoisoned)?; match self.epoch_map.is_block_production_slot(epoch, slot) {
match epoch_map.get(&epoch) { Ok(result) => result,
None => return Ok(PollOutcome::ProducerDutiesUnknown(slot)), Err(DutiesReaderError::UnknownEpoch) => {
Some(duties) => duties.is_block_production_slot(slot), return Ok(PollOutcome::ProducerDutiesUnknown(slot))
} }
}; Err(DutiesReaderError::Poisoned) => return Err(Error::EpochMapPoisoned),
};
if is_block_production_slot { if is_block_production_slot {
self.last_processed_slot = slot; self.last_processed_slot = slot;
@ -178,6 +178,7 @@ mod tests {
use super::test_node::TestBeaconNode; use super::test_node::TestBeaconNode;
use super::*; use super::*;
use crate::duties::EpochDuties; use crate::duties::EpochDuties;
use crate::duties::EpochDutiesMap;
use slot_clock::TestingSlotClock; use slot_clock::TestingSlotClock;
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
@ -191,7 +192,7 @@ mod tests {
let mut rng = XorShiftRng::from_seed([42; 16]); let mut rng = XorShiftRng::from_seed([42; 16]);
let spec = Arc::new(ChainSpec::foundation()); let spec = Arc::new(ChainSpec::foundation());
let epoch_map = Arc::new(RwLock::new(EpochDutiesMap::new())); let epoch_map = Arc::new(EpochDutiesMap::new());
let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0)));
let beacon_node = Arc::new(TestBeaconNode::default()); let beacon_node = Arc::new(TestBeaconNode::default());
@ -213,7 +214,7 @@ mod tests {
..std::default::Default::default() ..std::default::Default::default()
}; };
let produce_epoch = produce_slot / spec.epoch_length; let produce_epoch = produce_slot / spec.epoch_length;
epoch_map.write().unwrap().insert(produce_epoch, duties); epoch_map.insert(produce_epoch, duties);
// One slot before production slot... // One slot before production slot...
slot_clock.write().unwrap().set_slot(produce_slot - 1); slot_clock.write().unwrap().set_slot(produce_slot - 1);

View File

@ -1,15 +1,15 @@
use super::traits::BeaconNode; use super::traits::{BeaconNode, DutiesReader};
use super::{BlockProducer, PollOutcome as BlockProducerPollOutcome, SlotClock}; use super::{BlockProducer, PollOutcome as BlockProducerPollOutcome, SlotClock};
use slog::{error, info, warn, Logger}; use slog::{error, info, warn, Logger};
use std::time::Duration; use std::time::Duration;
pub struct BlockProducerService<T: SlotClock, U: BeaconNode> { pub struct BlockProducerService<T: SlotClock, U: BeaconNode, V: DutiesReader> {
pub block_producer: BlockProducer<T, U>, pub block_producer: BlockProducer<T, U, V>,
pub poll_interval_millis: u64, pub poll_interval_millis: u64,
pub log: Logger, pub log: Logger,
} }
impl<T: SlotClock, U: BeaconNode> BlockProducerService<T, U> { impl<T: SlotClock, U: BeaconNode, V: DutiesReader> BlockProducerService<T, U, V> {
/// Run a loop which polls the block producer each `poll_interval_millis` millseconds. /// Run a loop which polls the block producer each `poll_interval_millis` millseconds.
/// ///
/// Logs the results of the polls. /// Logs the results of the polls.

View File

@ -17,3 +17,12 @@ pub trait BeaconNode: Send + Sync {
/// Returns `true` if the publish was sucessful. /// Returns `true` if the publish was sucessful.
fn publish_beacon_block(&self, block: BeaconBlock) -> Result<bool, BeaconNodeError>; fn publish_beacon_block(&self, block: BeaconBlock) -> Result<bool, BeaconNodeError>;
} }
pub enum DutiesReaderError {
UnknownEpoch,
Poisoned,
}
pub trait DutiesReader: Send + Sync {
fn is_block_production_slot(&self, epoch: u64, slot: u64) -> Result<bool, DutiesReaderError>;
}

View File

@ -5,6 +5,7 @@ mod test_node;
mod traits; mod traits;
use self::traits::{BeaconNode, BeaconNodeError}; use self::traits::{BeaconNode, BeaconNodeError};
use super::block_producer::traits::{DutiesReader, DutiesReaderError};
use bls::PublicKey; use bls::PublicKey;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use spec::ChainSpec; use spec::ChainSpec;
@ -36,8 +37,52 @@ impl EpochDuties {
} }
} }
pub enum EpochDutiesMapError {
Poisoned,
}
/// Maps an `epoch` to some `EpochDuties` for a single validator. /// Maps an `epoch` to some `EpochDuties` for a single validator.
pub type EpochDutiesMap = HashMap<u64, EpochDuties>; pub struct EpochDutiesMap {
pub map: RwLock<HashMap<u64, EpochDuties>>,
}
impl EpochDutiesMap {
pub fn new() -> Self {
Self {
map: RwLock::new(HashMap::new()),
}
}
pub fn get(&self, epoch: u64) -> Result<Option<EpochDuties>, EpochDutiesMapError> {
let map = self.map.read().map_err(|_| EpochDutiesMapError::Poisoned)?;
match map.get(&epoch) {
Some(duties) => Ok(Some(duties.clone())),
None => Ok(None),
}
}
pub fn insert(
&self,
epoch: u64,
epoch_duties: EpochDuties,
) -> Result<Option<EpochDuties>, EpochDutiesMapError> {
let mut map = self
.map
.write()
.map_err(|_| EpochDutiesMapError::Poisoned)?;
Ok(map.insert(epoch, epoch_duties))
}
}
impl DutiesReader for EpochDutiesMap {
fn is_block_production_slot(&self, epoch: u64, slot: u64) -> Result<bool, DutiesReaderError> {
let map = self.map.read().map_err(|_| DutiesReaderError::Poisoned)?;
let duties = map
.get(&epoch)
.ok_or_else(|| DutiesReaderError::UnknownEpoch)?;
Ok(duties.is_block_production_slot(slot))
}
}
#[derive(Debug, PartialEq, Clone, Copy)] #[derive(Debug, PartialEq, Clone, Copy)]
pub enum PollOutcome { pub enum PollOutcome {
@ -68,7 +113,7 @@ pub enum Error {
/// ///
/// There is a single `DutiesManager` per validator instance. /// There is a single `DutiesManager` per validator instance.
pub struct DutiesManager<T: SlotClock, U: BeaconNode> { pub struct DutiesManager<T: SlotClock, U: BeaconNode> {
pub duties_map: Arc<RwLock<EpochDutiesMap>>, pub duties_map: Arc<EpochDutiesMap>,
/// The validator's public key. /// The validator's public key.
pub pubkey: PublicKey, pub pubkey: PublicKey,
pub spec: Arc<ChainSpec>, pub spec: Arc<ChainSpec>,
@ -95,14 +140,9 @@ impl<T: SlotClock, U: BeaconNode> DutiesManager<T, U> {
.ok_or(Error::EpochLengthIsZero)?; .ok_or(Error::EpochLengthIsZero)?;
if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? { if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? {
let mut map = self
.duties_map
.write()
.map_err(|_| Error::EpochMapPoisoned)?;
// If these duties were known, check to see if they're updates or identical. // If these duties were known, check to see if they're updates or identical.
let result = if let Some(known_duties) = map.get(&epoch) { let result = if let Some(known_duties) = self.duties_map.get(epoch)? {
if *known_duties == duties { if known_duties == duties {
Ok(PollOutcome::NoChange(epoch)) Ok(PollOutcome::NoChange(epoch))
} else { } else {
Ok(PollOutcome::DutiesChanged(epoch, duties)) Ok(PollOutcome::DutiesChanged(epoch, duties))
@ -110,7 +150,7 @@ impl<T: SlotClock, U: BeaconNode> DutiesManager<T, U> {
} else { } else {
Ok(PollOutcome::NewDuties(epoch, duties)) Ok(PollOutcome::NewDuties(epoch, duties))
}; };
map.insert(epoch, duties); self.duties_map.insert(epoch, duties)?;
result result
} else { } else {
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) Ok(PollOutcome::UnknownValidatorOrEpoch(epoch))
@ -124,6 +164,14 @@ impl From<BeaconNodeError> for Error {
} }
} }
impl From<EpochDutiesMapError> for Error {
fn from(e: EpochDutiesMapError) -> Error {
match e {
EpochDutiesMapError::Poisoned => Error::EpochMapPoisoned,
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::test_node::TestBeaconNode; use super::test_node::TestBeaconNode;
@ -139,7 +187,7 @@ mod tests {
#[test] #[test]
pub fn polling() { pub fn polling() {
let spec = Arc::new(ChainSpec::foundation()); let spec = Arc::new(ChainSpec::foundation());
let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); let duties_map = Arc::new(EpochDutiesMap::new());
let keypair = Keypair::random(); let keypair = Keypair::random();
let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0)));
let beacon_node = Arc::new(TestBeaconNode::default()); let beacon_node = Arc::new(TestBeaconNode::default());

View File

@ -107,7 +107,7 @@ fn main() {
for keypair in keypairs { for keypair in keypairs {
info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id()); info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());
let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); let duties_map = Arc::new(EpochDutiesMap::new());
// Spawn a new thread to maintain the validator's `EpochDuties`. // Spawn a new thread to maintain the validator's `EpochDuties`.
let duties_manager_thread = { let duties_manager_thread = {