Impl more validator client

This commit is contained in:
Paul Hauner 2019-01-15 17:42:55 +11:00
parent 3106d28bfa
commit 3891a6017a
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C
5 changed files with 105 additions and 82 deletions

View File

@ -10,7 +10,6 @@ protobuf = "2.0.2"
protos = { path = "../protos" } protos = { path = "../protos" }
slot_clock = { path = "../beacon_chain/utils/slot_clock" } slot_clock = { path = "../beacon_chain/utils/slot_clock" }
spec = { path = "../beacon_chain/spec" } spec = { path = "../beacon_chain/spec" }
tokio = "0.1.14"
types = { path = "../beacon_chain/types" } types = { path = "../beacon_chain/types" }
slog = "^2.2.3" slog = "^2.2.3"
slog-term = "^2.4.0" slog-term = "^2.4.0"

View File

@ -1,4 +1,5 @@
mod grpc; mod grpc;
mod service;
mod test_node; mod test_node;
mod traits; mod traits;
@ -10,20 +11,22 @@ use std::collections::HashMap;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use types::BeaconBlock; use types::BeaconBlock;
pub use self::service::BlockProducerService;
type EpochMap = HashMap<u64, EpochDuties>; type EpochMap = HashMap<u64, EpochDuties>;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum PollOutcome { pub enum PollOutcome {
BlockProduced, BlockProduced(u64),
SlashableBlockNotProduced, SlashableBlockNotProduced(u64),
BlockProductionNotRequired, BlockProductionNotRequired(u64),
ProducerDutiesUnknown, ProducerDutiesUnknown(u64),
SlotAlreadyProcessed, SlotAlreadyProcessed(u64),
BeaconNodeUnableToProduceBlock, BeaconNodeUnableToProduceBlock(u64),
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum PollError { pub enum Error {
SlotClockError, SlotClockError,
SlotUnknowable, SlotUnknowable,
EpochMapPoisoned, EpochMapPoisoned,
@ -61,27 +64,25 @@ impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> {
/// "Poll" to see if the validator is required to take any action. /// "Poll" to see if the validator is required to take any action.
/// ///
/// The slot clock will be read and any new actions undertaken. /// The slot clock will be read and any new actions undertaken.
pub fn poll(&mut self) -> Result<PollOutcome, PollError> { pub fn poll(&mut self) -> Result<PollOutcome, Error> {
let slot = self let slot = self
.slot_clock .slot_clock
.read() .read()
.map_err(|_| PollError::SlotClockPoisoned)? .map_err(|_| Error::SlotClockPoisoned)?
.present_slot() .present_slot()
.map_err(|_| PollError::SlotClockError)? .map_err(|_| Error::SlotClockError)?
.ok_or(PollError::SlotUnknowable)?; .ok_or(Error::SlotUnknowable)?;
let epoch = slot.checked_div(self.spec.epoch_length) let epoch = slot
.ok_or(PollError::EpochLengthIsZero)?; .checked_div(self.spec.epoch_length)
.ok_or(Error::EpochLengthIsZero)?;
// If this is a new slot. // If this is a new slot.
if slot > self.last_processed_slot { if slot > self.last_processed_slot {
let is_block_production_slot = { let is_block_production_slot = {
let epoch_map = self let epoch_map = self.epoch_map.read().map_err(|_| Error::EpochMapPoisoned)?;
.epoch_map
.read()
.map_err(|_| PollError::EpochMapPoisoned)?;
match epoch_map.get(&epoch) { match epoch_map.get(&epoch) {
None => return Ok(PollOutcome::ProducerDutiesUnknown), None => return Ok(PollOutcome::ProducerDutiesUnknown(slot)),
Some(duties) => duties.is_block_production_slot(slot), Some(duties) => duties.is_block_production_slot(slot),
} }
}; };
@ -91,24 +92,24 @@ impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> {
self.produce_block(slot) self.produce_block(slot)
} else { } else {
Ok(PollOutcome::BlockProductionNotRequired) Ok(PollOutcome::BlockProductionNotRequired(slot))
} }
} else { } else {
Ok(PollOutcome::SlotAlreadyProcessed) Ok(PollOutcome::SlotAlreadyProcessed(slot))
} }
} }
fn produce_block(&mut self, slot: u64) -> Result<PollOutcome, PollError> { fn produce_block(&mut self, slot: u64) -> Result<PollOutcome, Error> {
if let Some(block) = self.beacon_node.produce_beacon_block(slot)? { if let Some(block) = self.beacon_node.produce_beacon_block(slot)? {
if self.safe_to_produce(&block) { if self.safe_to_produce(&block) {
let block = self.sign_block(block); let block = self.sign_block(block);
self.beacon_node.publish_beacon_block(block)?; self.beacon_node.publish_beacon_block(block)?;
Ok(PollOutcome::BlockProduced) Ok(PollOutcome::BlockProduced(slot))
} else { } else {
Ok(PollOutcome::SlashableBlockNotProduced) Ok(PollOutcome::SlashableBlockNotProduced(slot))
} }
} else { } else {
Ok(PollOutcome::BeaconNodeUnableToProduceBlock) Ok(PollOutcome::BeaconNodeUnableToProduceBlock(slot))
} }
} }
@ -128,9 +129,9 @@ impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> {
} }
} }
impl From<BeaconNodeError> for PollError { impl From<BeaconNodeError> for Error {
fn from(e: BeaconNodeError) -> PollError { fn from(e: BeaconNodeError) -> Error {
PollError::BeaconNodeError(e) Error::BeaconNodeError(e)
} }
} }
@ -165,7 +166,6 @@ mod tests {
beacon_node.set_next_produce_result(Ok(Some(BeaconBlock::random_for_test(&mut rng)))); beacon_node.set_next_produce_result(Ok(Some(BeaconBlock::random_for_test(&mut rng))));
beacon_node.set_next_publish_result(Ok(true)); beacon_node.set_next_publish_result(Ok(true));
// Setup some valid duties for the validator // Setup some valid duties for the validator
let produce_slot = 100; let produce_slot = 100;
let duties = EpochDuties { let duties = EpochDuties {
@ -177,22 +177,38 @@ mod tests {
// One slot before production slot... // One slot before production slot...
slot_clock.write().unwrap().set_slot(produce_slot - 1); slot_clock.write().unwrap().set_slot(produce_slot - 1);
assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProductionNotRequired)); assert_eq!(
block_producer.poll(),
Ok(PollOutcome::BlockProductionNotRequired(produce_slot - 1))
);
// On the produce slot... // On the produce slot...
slot_clock.write().unwrap().set_slot(produce_slot); slot_clock.write().unwrap().set_slot(produce_slot);
assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProduced)); assert_eq!(
block_producer.poll(),
Ok(PollOutcome::BlockProduced(produce_slot))
);
// Trying the same produce slot again... // Trying the same produce slot again...
slot_clock.write().unwrap().set_slot(produce_slot); slot_clock.write().unwrap().set_slot(produce_slot);
assert_eq!(block_producer.poll(), Ok(PollOutcome::SlotAlreadyProcessed)); assert_eq!(
block_producer.poll(),
Ok(PollOutcome::SlotAlreadyProcessed(produce_slot))
);
// One slot after the produce slot... // One slot after the produce slot...
slot_clock.write().unwrap().set_slot(produce_slot + 1); slot_clock.write().unwrap().set_slot(produce_slot + 1);
assert_eq!(block_producer.poll(), Ok(PollOutcome::BlockProductionNotRequired)); assert_eq!(
block_producer.poll(),
Ok(PollOutcome::BlockProductionNotRequired(produce_slot + 1))
);
// In an epoch without known duties... // In an epoch without known duties...
slot_clock.write().unwrap().set_slot((produce_epoch + 1) * spec.epoch_length); let slot = (produce_epoch + 1) * spec.epoch_length;
assert_eq!(block_producer.poll(), Ok(PollOutcome::ProducerDutiesUnknown)); slot_clock.write().unwrap().set_slot(slot);
assert_eq!(
block_producer.poll(),
Ok(PollOutcome::ProducerDutiesUnknown(slot))
);
} }
} }

View File

@ -0,0 +1,40 @@
use super::traits::BeaconNode;
use super::{BlockProducer, PollOutcome as BlockProducerPollOutcome, SlotClock};
use slog::{error, info, warn, Logger};
use std::time::Duration;
pub struct BlockProducerService<T: SlotClock, U: BeaconNode> {
pub block_producer: BlockProducer<T, U>,
pub poll_interval_millis: u64,
pub log: Logger,
}
impl<T: SlotClock, U: BeaconNode> BlockProducerService<T, U> {
pub fn run(&mut self) {
loop {
match self.block_producer.poll() {
Err(error) => {
error!(self.log, "Block producer poll error"; "error" => format!("{:?}", error))
}
Ok(BlockProducerPollOutcome::BlockProduced(slot)) => info!(self.log, "Produced block"; "slot" => slot),
Ok(BlockProducerPollOutcome::SlashableBlockNotProduced(slot)) => {
warn!(self.log, "Slashable block was not signed"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::BlockProductionNotRequired(slot)) => {
info!(self.log, "Block production not required"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::ProducerDutiesUnknown(slot)) => {
error!(self.log, "Block production duties unknown"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::SlotAlreadyProcessed(slot)) => {
warn!(self.log, "Attempted to re-process slot"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock(slot)) => {
error!(self.log, "Beacon node unable to produce block"; "slot" => slot)
}
};
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
}

View File

@ -6,7 +6,7 @@ pub enum BeaconNodeError {
DecodeFailure, DecodeFailure,
} }
pub trait BeaconNode { pub trait BeaconNode: Send + Sync {
fn produce_beacon_block(&self, slot: u64) -> Result<Option<BeaconBlock>, BeaconNodeError>; fn produce_beacon_block(&self, slot: u64) -> Result<Option<BeaconBlock>, BeaconNodeError>;
fn publish_beacon_block(&self, block: BeaconBlock) -> Result<bool, BeaconNodeError>; fn publish_beacon_block(&self, block: BeaconBlock) -> Result<bool, BeaconNodeError>;
} }

View File

@ -1,23 +1,13 @@
mod block_producer; use crate::block_producer::{BlockProducer, BlockProducerService};
use spec::ChainSpec;
use tokio::prelude::*;
use tokio::timer::Interval;
use crate::block_producer::{BlockProducer, PollOutcome as BlockProducerPollOutcome};
use std::time::{Duration, Instant};
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
use slot_clock::SystemTimeSlotClock;
use grpcio::{ChannelBuilder, EnvBuilder}; use grpcio::{ChannelBuilder, EnvBuilder};
use protos::services_grpc::BeaconBlockServiceClient; use protos::services_grpc::BeaconBlockServiceClient;
use slog::{info, o, Drain};
use slot_clock::SystemTimeSlotClock;
use spec::ChainSpec;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use slog::{error, info, o, warn, Drain}; mod block_producer;
fn main() { fn main() {
// gRPC // gRPC
@ -47,40 +37,18 @@ fn main() {
Arc::new(RwLock::new(clock)) Arc::new(RwLock::new(clock))
}; };
let mut block_producer = let block_producer =
BlockProducer::new(spec.clone(), epoch_map.clone(), slot_clock.clone(), client); BlockProducer::new(spec.clone(), epoch_map.clone(), slot_clock.clone(), client);
info!(log, "Slot duration"; "milliseconds" => duration); info!(log, "Slot duration"; "milliseconds" => duration);
let task = Interval::new(Instant::now(), Duration::from_millis(duration)) let mut block_producer_service = BlockProducerService {
// .take(10) block_producer,
.for_each(move |_instant| { poll_interval_millis: spec.epoch_length * 1000 / 100, // 1% epoch time precision.
match block_producer.poll() { log: log.clone(),
Err(error) => {
error!(log, "Block producer poll error"; "error" => format!("{:?}", error))
}
Ok(BlockProducerPollOutcome::BlockProduced) => info!(log, "Produced block"),
Ok(BlockProducerPollOutcome::SlashableBlockNotProduced) => {
warn!(log, "Slashable block was not signed")
}
Ok(BlockProducerPollOutcome::BlockProductionNotRequired) => {
info!(log, "Block production not required")
}
Ok(BlockProducerPollOutcome::ProducerDutiesUnknown) => {
error!(log, "Block production duties unknown")
}
Ok(BlockProducerPollOutcome::SlotAlreadyProcessed) => {
warn!(log, "Attempted to re-process slot")
}
Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock) => {
error!(log, "Beacon node unable to produce block")
}
}; };
Ok(())
})
.map_err(|e| panic!("Block producer interval errored; err={:?}", e));
tokio::run(task); block_producer_service.run();
} }
#[derive(Debug, PartialEq, Clone, Copy, Default)] #[derive(Debug, PartialEq, Clone, Copy, Default)]