diff --git a/validator_client/src/block_producer_service/beacon_block_node.rs b/validator_client/src/block_producer/beacon_block_node.rs similarity index 100% rename from validator_client/src/block_producer_service/beacon_block_node.rs rename to validator_client/src/block_producer/beacon_block_node.rs diff --git a/validator_client/src/block_producer_service/block_producer.rs b/validator_client/src/block_producer/block_producer.rs similarity index 100% rename from validator_client/src/block_producer_service/block_producer.rs rename to validator_client/src/block_producer/block_producer.rs diff --git a/validator_client/src/block_producer_service/mod.rs b/validator_client/src/block_producer/mod.rs similarity index 100% rename from validator_client/src/block_producer_service/mod.rs rename to validator_client/src/block_producer/mod.rs diff --git a/validator_client/src/duties/traits.rs b/validator_client/src/duties/beacon_node_duties.rs similarity index 79% rename from validator_client/src/duties/traits.rs rename to validator_client/src/duties/beacon_node_duties.rs index 374bed9f6..efe9e836d 100644 --- a/validator_client/src/duties/traits.rs +++ b/validator_client/src/duties/beacon_node_duties.rs @@ -2,12 +2,12 @@ use super::EpochDuties; use types::{Epoch, PublicKey}; #[derive(Debug, PartialEq, Clone)] -pub enum BeaconNodeError { +pub enum BeaconNodeDutiesError { RemoteFailure(String), } /// Defines the methods required to obtain a validators shuffling from a Beacon Node. -pub trait BeaconNode: Send + Sync { +pub trait BeaconNodeDuties: Send + Sync { /// Gets the duties for all validators. /// /// Returns a vector of EpochDuties for each validator public key. The entry will be None for @@ -16,5 +16,5 @@ pub trait BeaconNode: Send + Sync { &self, epoch: Epoch, pubkeys: &[PublicKey], - ) -> Result; + ) -> Result; } diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs index 511ffa34a..f05307141 100644 --- a/validator_client/src/duties/grpc.rs +++ b/validator_client/src/duties/grpc.rs @@ -1,5 +1,5 @@ +use super::beacon_node_duties::{BeaconNodeDuties, BeaconNodeDutiesError}; use super::epoch_duties::{EpochDuties, EpochDuty}; -use super::traits::{BeaconNode, BeaconNodeError}; use grpcio::CallOption; use protos::services::{GetDutiesRequest, Validators}; use protos::services_grpc::ValidatorServiceClient; @@ -8,13 +8,13 @@ use std::collections::HashMap; use std::time::Duration; use types::{Epoch, PublicKey, Slot}; -impl BeaconNode for ValidatorServiceClient { +impl BeaconNodeDuties for ValidatorServiceClient { /// Requests all duties (block signing and committee attesting) from the Beacon Node (BN). fn request_duties( &self, epoch: Epoch, pubkeys: &[PublicKey], - ) -> Result { + ) -> Result { // Get the required duties from all validators // build the request let mut req = GetDutiesRequest::new(); @@ -29,7 +29,7 @@ impl BeaconNode for ValidatorServiceClient { // send the request, get the duties reply let reply = self .get_validator_duties(&req) - .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + .map_err(|err| BeaconNodeDutiesError::RemoteFailure(format!("{:?}", err)))?; let mut epoch_duties: HashMap> = HashMap::new(); for (index, validator_duty) in reply.get_active_validators().iter().enumerate() { diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index 9fce1a353..c16fc81fc 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -1,13 +1,13 @@ +mod beacon_node_duties; mod epoch_duties; mod grpc; // TODO: reintroduce tests //#[cfg(test)] //mod test_node; -mod traits; +pub use self::beacon_node_duties::{BeaconNodeDuties, BeaconNodeDutiesError}; use self::epoch_duties::{EpochDuties, EpochDutiesMapError}; pub use self::epoch_duties::{EpochDutiesMap, WorkInfo}; -use self::traits::{BeaconNode, BeaconNodeError}; use futures::Async; use slog::{debug, error, info}; use std::sync::Arc; @@ -29,7 +29,7 @@ pub enum UpdateOutcome { pub enum Error { DutiesMapPoisoned, EpochMapPoisoned, - BeaconNodeError(BeaconNodeError), + BeaconNodeDutiesError(BeaconNodeDutiesError), UnknownEpoch, UnknownValidator, } @@ -38,14 +38,14 @@ pub enum Error { /// Node. /// /// This keeps track of all validator keys and required voting slots. -pub struct DutiesManager { +pub struct DutiesManager { pub duties_map: RwLock, /// A list of all public keys known to the validator service. pub pubkeys: Vec, pub beacon_node: Arc, } -impl DutiesManager { +impl DutiesManager { /// Check the Beacon Node for `EpochDuties`. /// /// be a wall-clock (e.g., system time, remote server time, etc.). @@ -112,9 +112,9 @@ impl DutiesManager { } //TODO: Use error_chain to handle errors -impl From for Error { - fn from(e: BeaconNodeError) -> Error { - Error::BeaconNodeError(e) +impl From for Error { + fn from(e: BeaconNodeDutiesError) -> Error { + Error::BeaconNodeDutiesError(e) } } diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 0b11ed0d0..6f02cf6ee 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,5 +1,5 @@ mod attester_service; -mod block_producer_service; +mod block_producer; mod config; mod duties; pub mod error; diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 9c0f5d23c..398f6d777 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -9,15 +9,14 @@ /// data from the beacon node and performs the signing before publishing the block to the beacon /// node. use crate::attester_service::{AttestationGrpcClient, AttesterService}; +use crate::block_producer::BlockProducer; use crate::block_producer_service::BeaconBlockGrpcClient; use crate::config::Config as ValidatorConfig; -use crate::duties::UpdateOutcome; -use crate::duties::{DutiesManager, EpochDutiesMap}; +use crate::duties::{BeaconNodeDuties, DutiesManager, EpochDutiesMap, UpdateOutcome}; use crate::error as error_chain; use crate::error::ErrorKind; use attester::test_utils::EpochMap; use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; -use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; use bls::Keypair; use grpcio::{ChannelBuilder, EnvBuilder}; use protos::services::Empty; @@ -35,13 +34,13 @@ use tokio::runtime::Builder; use tokio::timer::Interval; use tokio_timer::clock::Clock; use types::test_utils::generate_deterministic_keypairs; -use types::{Epoch, Fork, Slot}; +use types::{ChainSpec, Epoch, Fork, Slot}; //TODO: This service should be simplified in the future. Can be made more steamlined. /// The validator service. This is the main thread that executes and maintains validator /// duties. -pub struct Service { +pub struct Service { /// The node we currently connected to. connected_node_version: String, /// The chain id we are processing on. @@ -50,28 +49,25 @@ pub struct Service { slot_clock: SystemTimeSlotClock, /// The current slot we are processing. current_slot: Slot, - /// The number of slots per epoch to allow for converting slots to epochs. - slots_per_epoch: u64, + /// The chain specification for this clients instance. + spec: Arc, + /// The duties manager which maintains the state of when to perform actions. + duties_manager: Arc>, // GRPC Clients /// The beacon block GRPC client. - beacon_block_client: Arc, - /// The validator GRPC client. - validator_client: Arc, + beacon_block_client: Arc, /// The attester GRPC client. attester_client: Arc, /// The validator client logger. log: slog::Logger, } -impl Service { +impl Service { /// Initial connection to the beacon node to determine its properties. /// /// This tries to connect to a beacon node. Once connected, it initialised the gRPC clients /// and returns an instance of the service. - fn initialize_service( - config: &ValidatorConfig, - log: slog::Logger, - ) -> error_chain::Result { + fn initialize_service(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result { // initialise the beacon node client to check for a connection let env = Arc::new(EnvBuilder::new().build()); @@ -139,7 +135,9 @@ impl Service { // Beacon node gRPC beacon block endpoints. let beacon_block_client = { let ch = ChannelBuilder::new(env.clone()).connect(&config.server); - Arc::new(BeaconBlockServiceClient::new(ch)) + let beacon_block_service_client = Arc::new(BeaconBlockServiceClient::new(ch)); + // a wrapper around the service client to implement the beacon block node trait + Arc::new(BeaconBlockGrpcClient::new(beacon_block_service_client)) }; // Beacon node gRPC validator endpoints. @@ -164,14 +162,37 @@ impl Service { .map_err(|e| ErrorKind::SlotClockError(e))? .expect("Genesis must be in the future"); + let spec = Arc::new(config.spec); + + /* Generate the duties manager */ + + // generate keypairs + + // TODO: keypairs are randomly generated; they should be loaded from a file or generated. + // https://github.com/sigp/lighthouse/issues/160 + let keypairs = Arc::new(generate_deterministic_keypairs(8)); + + // Builds a mapping of Epoch -> Map(PublicKey, EpochDuty) + // where EpochDuty contains slot numbers and attestation data that each validator needs to + // produce work on. + let duties_map = RwLock::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); + + // builds a manager which maintains the list of current duties for all known validators + // and can check when a validator needs to perform a task. + let duties_manager = Arc::new(DutiesManager { + duties_map, + pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(), + beacon_node: validator_client, + }); + Ok(Self { connected_node_version: node_info.version, fork, slot_clock, current_slot, - slots_per_epoch: config.spec.slots_per_epoch, + spec, + duties_manager, beacon_block_client, - validator_client, attester_client, log, }) @@ -180,7 +201,7 @@ impl Service { /// Initialise the service then run the core thread. pub fn start(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result<()> { // connect to the node and retrieve its properties and initialize the gRPC clients - let service = Service::initialize_service(&config, log)?; + let service = Service::initialize_service(config, log)?; // we have connected to a node and established its parameters. Spin up the core service @@ -205,137 +226,95 @@ impl Service { Interval::new(Instant::now() + duration_to_next_slot, slot_duration) }; - /* kick off core service */ - - // generate keypairs - - // TODO: keypairs are randomly generated; they should be loaded from a file or generated. - // https://github.com/sigp/lighthouse/issues/160 - let keypairs = Arc::new(generate_deterministic_keypairs(8)); - - /* build requisite objects to pass to core thread */ - - // Builds a mapping of Epoch -> Map(PublicKey, EpochDuty) - // where EpochDuty contains slot numbers and attestation data that each validator needs to - // produce work on. - let duties_map = RwLock::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); - - // builds a manager which maintains the list of current duties for all known validators - // and can check when a validator needs to perform a task. - let manager = Arc::new(DutiesManager { - duties_map, - pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(), - beacon_node: service.validator_client.clone(), - }); - - // run the core thread + /* kick off the core service */ runtime.block_on( interval .for_each(move |_| { - let log = service.log.clone(); - - /* get the current slot and epoch */ - let current_slot = match service.slot_clock.present_slot() { - Err(e) => { - error!(log, "SystemTimeError {:?}", e); - return Ok(()); - } - Ok(slot) => slot.expect("Genesis is in the future"), - }; - - let current_epoch = current_slot.epoch(service.slots_per_epoch); - - debug_assert!( - current_slot > service.current_slot, - "The Timer should poll a new slot" - ); - - info!(log, "Processing"; "slot" => current_slot.as_u64(), "epoch" => current_epoch.as_u64()); - - /* check for new duties */ - - let cloned_manager = manager.clone(); - let cloned_log = log.clone(); - // spawn a new thread separate to the runtime - std::thread::spawn(move || { - let _empty_error = cloned_manager.run_update(current_epoch.clone(), cloned_log.clone()); - }); - - /* execute any specified duties */ - - if let Some(work) = manager.get_current_work(current_slot) { - for (_public_key, work_type) in work { - if work_type.produce_block { - // TODO: Produce a beacon block in a new thread - } - if work_type.attestation_duty.is_some() { - // available AttestationDuty info - let attestation_duty = - work_type.attestation_duty.expect("Cannot be None"); - //TODO: Produce an attestation in a new thread - } - } - } - + // if a non-fatal error occurs, proceed to the next slot. + let _ignore_error = service.per_slot_execution(); + // completed a slot process Ok(()) }) .map_err(|e| format!("Service thread failed: {:?}", e)), ); - - // completed a slot process + // validator client exited Ok(()) } - /* - // Spawn a new thread to perform block production for the validator. - let producer_thread = { - let spec = spec.clone(); - let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone())); - let duties_map = duties_map.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); - thread::spawn(move || { - let block_producer = - BlockProducer::new(spec, duties_map, slot_clock, client, signer); - let mut block_producer_service = BlockProducerService { - block_producer, - poll_interval_millis, - log, - }; + /// The execution logic that runs every slot. + // Errors are logged to output, and core execution continues unless fatal errors occur. + fn per_slot_execution(&mut self) -> error_chain::Result<()> { + /* get the new current slot and epoch */ + self.update_current_slot()?; - block_producer_service.run(); - }) - }; + /* check for new duties */ + self.check_for_duties(); - // Spawn a new thread for attestation for the validator. - let attester_thread = { - let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); - let epoch_map = epoch_map_for_attester.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); - thread::spawn(move || { - let attester = Attester::new(epoch_map, slot_clock, client, signer); - let mut attester_service = AttesterService { - attester, - poll_interval_millis, - log, - }; + /* process any required duties for validators */ + self.process_duties(); - attester_service.run(); - }) - }; - - threads.push((duties_manager_thread, producer_thread, attester_thread)); + Ok(()) } - // Naively wait for all the threads to complete. - for tuple in threads { - let (manager, producer, attester) = tuple; - let _ = producer.join(); - let _ = manager.join(); - let _ = attester.join(); + /// Updates the known current slot and epoch. + fn update_current_slot(&mut self) -> error_chain::Result<()> { + let current_slot = match self.slot_clock.present_slot() { + Err(e) => { + error!(self.log, "SystemTimeError {:?}", e); + return Err("Could not read system time".into()); + } + Ok(slot) => slot.expect("Genesis is in the future"), + }; + + let current_epoch = current_slot.epoch(self.spec.slots_per_epoch); + + // this is a fatal error. If the slot clock repeats, there is something wrong with + // the timer, terminate immediately. + assert!( + current_slot > self.current_slot, + "The Timer should poll a new slot" + ); + self.current_slot = current_slot; + info!(self.log, "Processing"; "slot" => current_slot.as_u64(), "epoch" => current_epoch.as_u64()); + Ok(()) + } + + /// For all known validator keypairs, update any known duties from the beacon node. + fn check_for_duties(&mut self) { + let cloned_manager = self.duties_manager.clone(); + let cloned_log = self.log.clone(); + let current_epoch = self.current_slot.epoch(self.spec.slots_per_epoch); + // spawn a new thread separate to the runtime + // TODO: Handle thread termination/timeout + std::thread::spawn(move || { + // the return value is a future which returns ready. + // built to be compatible with the tokio runtime. + let _empty = cloned_manager.run_update(current_epoch.clone(), cloned_log.clone()); + }); + } + + /// If there are any duties to process, spawn a separate thread and perform required actions. + fn process_duties(&mut self) { + if let Some(work) = self.duties_manager.get_current_work(self.current_slot) { + for (_public_key, work_type) in work { + if work_type.produce_block { + // spawns a thread to produce a beacon block + std::thread::spawn(move || { + let block_producer = BlockProducer { + fork: self.fork, + slot: self.current_slot, + spec: self.spec.clone(), + }; + }); + + // TODO: Produce a beacon block in a new thread + } + if work_type.attestation_duty.is_some() { + // available AttestationDuty info + let attestation_duty = work_type.attestation_duty.expect("Cannot be None"); + //TODO: Produce an attestation in a new thread + } + } + } } - */ }