diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b272520c5..600c453fd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -290,7 +290,7 @@ where /// fork-choice rule). /// /// It is important to note that the `beacon_state` returned may not match the present slot. It - /// is the state as it was when the head block was recieved, which could be some slots prior to + /// is the state as it was when the head block was received, which could be some slots prior to /// now. pub fn head(&self) -> RwLockReadGuard { self.canonical_head.read() diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 8b8fcb5e6..d8d85a8a6 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -6,7 +6,7 @@ pub mod test_utils; pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; pub use self::checkpoint::CheckPoint; -pub use self::errors::BeaconChainError; +pub use self::errors::{BeaconChainError, BlockProductionError}; pub use db; pub use fork_choice; pub use parking_lot; diff --git a/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs b/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs index 7853459d7..2d2b9e84d 100644 --- a/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs +++ b/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs @@ -50,7 +50,7 @@ impl DirectBeaconNode { } impl AttesterBeaconNode for DirectBeaconNode { - fn produce_attestation( + fn produce_attestation_data( &self, _slot: Slot, shard: u64, diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 91a9f3a26..1a5ecbb53 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -22,13 +22,13 @@ pub fn run(client: &Client, executor: TaskExecutor, exit: Exi // build heartbeat logic here let heartbeat = move |_| { - debug!(log, "Temp heartbeat output"); + //debug!(log, "Temp heartbeat output"); //TODO: Remove this logic. Testing only let mut count = counter.lock().unwrap(); *count += 1; if *count % 5 == 0 { - debug!(log, "Sending Message"); + // debug!(log, "Sending Message"); network.send_message(); } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index b2d2b5a24..aee7eb466 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -161,7 +161,7 @@ fn network_service( libp2p_service.swarm.send_rpc(peer_id, rpc_event); } OutgoingMessage::NotifierTest => { - debug!(log, "Received message from notifier"); + // debug!(log, "Received message from notifier"); } }; } diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs new file mode 100644 index 000000000..abef49df1 --- /dev/null +++ b/beacon_node/rpc/src/attestation.rs @@ -0,0 +1,159 @@ +use crate::beacon_chain::BeaconChain; +use futures::Future; +use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; +use protos::services::{ + AttestationData as AttestationDataProto, ProduceAttestationDataRequest, + ProduceAttestationDataResponse, PublishAttestationRequest, PublishAttestationResponse, +}; +use protos::services_grpc::AttestationService; +use slog::{error, info, trace, warn}; +use ssz::{ssz_encode, Decodable}; +use std::sync::Arc; +use types::Attestation; + +#[derive(Clone)] +pub struct AttestationServiceInstance { + pub chain: Arc, + pub log: slog::Logger, +} + +impl AttestationService for AttestationServiceInstance { + /// Produce the `AttestationData` for signing by a validator. + fn produce_attestation_data( + &mut self, + ctx: RpcContext, + req: ProduceAttestationDataRequest, + sink: UnarySink, + ) { + trace!( + &self.log, + "Attempting to produce attestation at slot {}", + req.get_slot() + ); + + // verify the slot, drop lock on state afterwards + { + let slot_requested = req.get_slot(); + let state = self.chain.get_state(); + + // Start by performing some checks + // Check that the AttestionData is for the current slot (otherwise it will not be valid) + if slot_requested > state.slot.as_u64() { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::OutOfRange, + Some(format!( + "AttestationData request for a slot that is in the future." + )), + )) + .map_err(move |e| { + error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e) + }); + return ctx.spawn(f); + } + // currently cannot handle past slots. TODO: Handle this case + else if slot_requested < state.slot.as_u64() { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some(format!( + "AttestationData request for a slot that is in the past." + )), + )) + .map_err(move |e| { + error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e) + }); + return ctx.spawn(f); + } + } + + // Then get the AttestationData from the beacon chain + let shard = req.get_shard(); + let attestation_data = match self.chain.produce_attestation_data(shard) { + Ok(v) => v, + Err(e) => { + // Could not produce an attestation + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::Unknown, + Some(format!("Could not produce an attestation: {:?}", e)), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; + + let mut attestation_data_proto = AttestationDataProto::new(); + attestation_data_proto.set_ssz(ssz_encode(&attestation_data)); + + let mut resp = ProduceAttestationDataResponse::new(); + resp.set_attestation_data(attestation_data_proto); + + let error_log = self.log.clone(); + let f = sink + .success(resp) + .map_err(move |e| error!(error_log, "Failed to reply with success {:?}: {:?}", req, e)); + ctx.spawn(f) + } + + /// Accept some fully-formed `FreeAttestation` from the validator, + /// store it, and aggregate it into an `Attestation`. + fn publish_attestation( + &mut self, + ctx: RpcContext, + req: PublishAttestationRequest, + sink: UnarySink, + ) { + trace!(self.log, "Publishing attestation"); + + let mut resp = PublishAttestationResponse::new(); + let ssz_serialized_attestation = req.get_attestation().get_ssz(); + + let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) { + Ok((v, _index)) => v, + Err(_) => { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some("Invalid attestation".to_string()), + )) + .map_err(move |_| warn!(log_clone, "failed to reply {:?}", req)); + return ctx.spawn(f); + } + }; + + match self.chain.process_attestation(attestation) { + Ok(_) => { + // Attestation was successfully processed. + info!( + self.log, + "PublishAttestation"; + "type" => "valid_attestation", + ); + + resp.set_success(true); + } + Err(e) => { + // Attestation was invalid + warn!( + self.log, + "PublishAttestation"; + "type" => "invalid_attestation", + "error" => format!("{:?}", e), + ); + resp.set_success(false); + resp.set_msg(format!("InvalidAttestation: {:?}", e).as_bytes().to_vec()); + } + }; + + let error_log = self.log.clone(); + let f = sink + .success(resp) + .map_err(move |e| error!(error_log, "failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } +} diff --git a/beacon_node/rpc/src/beacon_attester.rs b/beacon_node/rpc/src/beacon_attester.rs deleted file mode 100644 index 36b6a40b2..000000000 --- a/beacon_node/rpc/src/beacon_attester.rs +++ /dev/null @@ -1,61 +0,0 @@ -use futures::Future; -use grpcio::{RpcContext, UnarySink}; -use protos::services::{ - Attestation as AttestationProto, ProduceAttestation, ProduceAttestationResponse, - ProduceAttestationRequest, PublishAttestationResponse, PublishAttestationRequest, - PublishAttestation -}; -use protos::services_grpc::BeaconBlockService; -use slog::Logger; - -#[derive(Clone)] -pub struct AttestationServiceInstance { - pub log: Logger, -} - -impl AttestationService for AttestationServiceInstance { - /// Produce a `BeaconBlock` for signing by a validator. - fn produce_attestation( - &mut self, - ctx: RpcContext, - req: ProduceAttestationRequest, - sink: UnarySink, - ) { - println!("producing attestation at slot {}", req.get_slot()); - - // TODO: build a legit block. - let mut attestation = AttestationProto::new(); - attestation.set_slot(req.get_slot()); - // TODO Set the shard to something legit. - attestation.set_shard(0); - attestation.set_block_root(b"cats".to_vec()); - - let mut resp = ProduceAttestationResponse::new(); - resp.set_attestation_data(attestation); - - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } - - /// Accept some fully-formed `BeaconBlock`, process and publish it. - fn publish_attestation( - &mut self, - ctx: RpcContext, - req: PublishAttestationRequest, - sink: UnarySink, - ) { - println!("publishing attestation {:?}", req.get_block()); - - // TODO: actually process the block. - let mut resp = PublishAttestationResponse::new(); - - resp.set_success(true); - - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } -} diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index 6978e0f0e..0b90f774a 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -2,7 +2,7 @@ use crate::beacon_chain::BeaconChain; use crossbeam_channel; use eth2_libp2p::PubsubMessage; use futures::Future; -use grpcio::{RpcContext, UnarySink}; +use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; use protos::services::{ BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse, @@ -10,10 +10,10 @@ use protos::services::{ }; use protos::services_grpc::BeaconBlockService; use slog::Logger; -use slog::{error, info, warn}; -use ssz::Decodable; +use slog::{error, info, trace, warn}; +use ssz::{ssz_encode, Decodable}; use std::sync::Arc; -use types::BeaconBlock; +use types::{BeaconBlock, Signature, Slot}; #[derive(Clone)] pub struct BeaconBlockServiceInstance { @@ -30,11 +30,44 @@ impl BeaconBlockService for BeaconBlockServiceInstance { req: ProduceBeaconBlockRequest, sink: UnarySink, ) { - println!("producing at slot {}", req.get_slot()); + trace!(self.log, "Generating a beacon block"; "req" => format!("{:?}", req)); + + // decode the request + // TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336 + let _requested_slot = Slot::from(req.get_slot()); + let randao_reveal = match Signature::ssz_decode(req.get_randao_reveal(), 0) { + Ok((reveal, _index)) => reveal, + Err(_) => { + // decode error, incorrect signature + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some(format!("Invalid randao reveal signature")), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; + + let produced_block = match self.chain.produce_block(randao_reveal) { + Ok((block, _state)) => block, + Err(e) => { + // could not produce a block + let log_clone = self.log.clone(); + warn!(self.log, "RPC Error"; "Error" => format!("Could not produce a block:{:?}",e)); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::Unknown, + Some(format!("Could not produce a block: {:?}", e)), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; - // TODO: build a legit block. let mut block = BeaconBlockProto::new(); - block.set_ssz(b"cats".to_vec()); + block.set_ssz(ssz_encode(&produced_block)); let mut resp = ProduceBeaconBlockResponse::new(); resp.set_block(block); @@ -52,6 +85,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance { req: PublishBeaconBlockRequest, sink: UnarySink, ) { + trace!(&self.log, "Attempting to publish a block"); + let mut resp = PublishBeaconBlockResponse::new(); let ssz_serialized_block = req.get_block().get_ssz(); diff --git a/beacon_node/rpc/src/beacon_chain.rs b/beacon_node/rpc/src/beacon_chain.rs index 0551a8024..ddc91b73c 100644 --- a/beacon_node/rpc/src/beacon_chain.rs +++ b/beacon_node/rpc/src/beacon_chain.rs @@ -2,12 +2,13 @@ use beacon_chain::BeaconChain as RawBeaconChain; use beacon_chain::{ db::ClientDB, fork_choice::ForkChoice, - parking_lot::RwLockReadGuard, + parking_lot::{RwLockReadGuard, RwLockWriteGuard}, slot_clock::SlotClock, - types::{BeaconState, ChainSpec}, + types::{BeaconState, ChainSpec, Signature}, + AttestationValidationError, BlockProductionError, }; pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; -use types::BeaconBlock; +use types::{Attestation, AttestationData, BeaconBlock}; /// The RPC's API to the beacon chain. pub trait BeaconChain: Send + Sync { @@ -15,8 +16,22 @@ pub trait BeaconChain: Send + Sync { fn get_state(&self) -> RwLockReadGuard; + fn get_mut_state(&self) -> RwLockWriteGuard; + fn process_block(&self, block: BeaconBlock) -> Result; + + fn produce_block( + &self, + randao_reveal: Signature, + ) -> Result<(BeaconBlock, BeaconState), BlockProductionError>; + + fn produce_attestation_data(&self, shard: u64) -> Result; + + fn process_attestation( + &self, + attestation: Attestation, + ) -> Result<(), AttestationValidationError>; } impl BeaconChain for RawBeaconChain @@ -33,10 +48,32 @@ where self.state.read() } + fn get_mut_state(&self) -> RwLockWriteGuard { + self.state.write() + } + fn process_block( &self, block: BeaconBlock, ) -> Result { self.process_block(block) } + + fn produce_block( + &self, + randao_reveal: Signature, + ) -> Result<(BeaconBlock, BeaconState), BlockProductionError> { + self.produce_block(randao_reveal) + } + + fn produce_attestation_data(&self, shard: u64) -> Result { + self.produce_attestation_data(shard) + } + + fn process_attestation( + &self, + attestation: Attestation, + ) -> Result<(), AttestationValidationError> { + self.process_attestation(attestation) + } } diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 2d47b4a69..5aac4ce55 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -1,19 +1,22 @@ +mod attestation; mod beacon_block; pub mod beacon_chain; mod beacon_node; pub mod config; mod validator; +use self::attestation::AttestationServiceInstance; use self::beacon_block::BeaconBlockServiceInstance; use self::beacon_chain::BeaconChain; use self::beacon_node::BeaconNodeServiceInstance; use self::validator::ValidatorServiceInstance; pub use config::Config as RPCConfig; -use futures::{future, Future}; -use grpcio::{Environment, Server, ServerBuilder}; +use futures::Future; +use grpcio::{Environment, ServerBuilder}; use network::NetworkMessage; use protos::services_grpc::{ - create_beacon_block_service, create_beacon_node_service, create_validator_service, + create_attestation_service, create_beacon_block_service, create_beacon_node_service, + create_validator_service, }; use slog::{info, o, warn}; use std::sync::Arc; @@ -56,11 +59,19 @@ pub fn start_server( }; create_validator_service(instance) }; + let attestation_service = { + let instance = AttestationServiceInstance { + chain: beacon_chain.clone(), + log: log.clone(), + }; + create_attestation_service(instance) + }; let mut server = ServerBuilder::new(env) .register_service(beacon_block_service) .register_service(validator_service) .register_service(beacon_node_service) + .register_service(attestation_service) .bind(config.listen_address.to_string(), config.port) .build() .unwrap(); diff --git a/beacon_node/rpc/src/validator.rs b/beacon_node/rpc/src/validator.rs index 936c95f52..0a9d7015c 100644 --- a/beacon_node/rpc/src/validator.rs +++ b/beacon_node/rpc/src/validator.rs @@ -4,7 +4,7 @@ use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use protos::services::{ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty}; use protos::services_grpc::ValidatorService; -use slog::{debug, info, warn, Logger}; +use slog::{trace, warn}; use ssz::Decodable; use std::sync::Arc; use types::{Epoch, RelativeEpoch}; @@ -12,7 +12,7 @@ use types::{Epoch, RelativeEpoch}; #[derive(Clone)] pub struct ValidatorServiceInstance { pub chain: Arc, - pub log: Logger, + pub log: slog::Logger, } //TODO: Refactor Errors @@ -27,14 +27,13 @@ impl ValidatorService for ValidatorServiceInstance { sink: UnarySink, ) { let validators = req.get_validators(); - debug!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); - - let epoch = Epoch::from(req.get_epoch()); - let mut resp = GetDutiesResponse::new(); - let resp_validators = resp.mut_active_validators(); + trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); let spec = self.chain.get_spec(); let state = self.chain.get_state(); + let epoch = Epoch::from(req.get_epoch()); + let mut resp = GetDutiesResponse::new(); + let resp_validators = resp.mut_active_validators(); let relative_epoch = match RelativeEpoch::from_epoch(state.slot.epoch(spec.slots_per_epoch), epoch) { @@ -157,6 +156,7 @@ impl ValidatorService for ValidatorServiceInstance { duty.set_committee_index(attestation_duties.committee_index as u64); duty.set_attestation_slot(attestation_duties.slot.as_u64()); duty.set_attestation_shard(attestation_duties.shard); + duty.set_committee_len(attestation_duties.committee_len as u64); active_validator.set_duty(duty); resp_validators.push(active_validator); diff --git a/eth2/attester/src/lib.rs b/eth2/attester/src/lib.rs index 270c1e4d7..a4295f005 100644 --- a/eth2/attester/src/lib.rs +++ b/eth2/attester/src/lib.rs @@ -94,7 +94,7 @@ impl Attester Result { - let attestation_data = match self.beacon_node.produce_attestation(slot, shard)? { + let attestation_data = match self.beacon_node.produce_attestation_data(slot, shard)? { Some(attestation_data) => attestation_data, None => return Ok(PollOutcome::BeaconNodeUnableToProduceAttestation(slot)), }; diff --git a/eth2/attester/src/test_utils/simulated_beacon_node.rs b/eth2/attester/src/test_utils/simulated_beacon_node.rs index 84a203cdb..d19f43422 100644 --- a/eth2/attester/src/test_utils/simulated_beacon_node.rs +++ b/eth2/attester/src/test_utils/simulated_beacon_node.rs @@ -26,7 +26,7 @@ impl SimulatedBeaconNode { } impl BeaconNode for SimulatedBeaconNode { - fn produce_attestation(&self, slot: Slot, shard: u64) -> ProduceResult { + fn produce_attestation_data(&self, slot: Slot, shard: u64) -> ProduceResult { *self.produce_input.write().unwrap() = Some((slot, shard)); match *self.produce_result.read().unwrap() { Some(ref r) => r.clone(), diff --git a/eth2/attester/src/traits.rs b/eth2/attester/src/traits.rs index 749c6e1a2..2fd6940af 100644 --- a/eth2/attester/src/traits.rs +++ b/eth2/attester/src/traits.rs @@ -14,7 +14,7 @@ pub enum PublishOutcome { /// Defines the methods required to produce and publish blocks on a Beacon Node. pub trait BeaconNode: Send + Sync { - fn produce_attestation( + fn produce_attestation_data( &self, slot: Slot, shard: u64, diff --git a/eth2/types/src/test_utils/testing_beacon_state_builder.rs b/eth2/types/src/test_utils/testing_beacon_state_builder.rs index e25da37e7..def58b0d7 100644 --- a/eth2/types/src/test_utils/testing_beacon_state_builder.rs +++ b/eth2/types/src/test_utils/testing_beacon_state_builder.rs @@ -6,6 +6,8 @@ use dirs; use log::debug; use rayon::prelude::*; use std::path::{Path, PathBuf}; +//TODO: testing only +use std::time::{Duration, SystemTime}; pub const KEYPAIRS_FILE: &str = "keypairs.raw_keypairs"; @@ -120,7 +122,12 @@ impl TestingBeaconStateBuilder { }) .collect(); - let genesis_time = 1554013000; // arbitrary + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + - 30; + let genesis_time = now; // arbitrary let mut state = BeaconState::genesis( genesis_time, diff --git a/eth2/utils/bls/src/keypair.rs b/eth2/utils/bls/src/keypair.rs index 6feb2a585..2f0e794a6 100644 --- a/eth2/utils/bls/src/keypair.rs +++ b/eth2/utils/bls/src/keypair.rs @@ -1,5 +1,7 @@ use super::{PublicKey, SecretKey}; use serde_derive::{Deserialize, Serialize}; +use std::fmt; +use std::hash::{Hash, Hasher}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Keypair { @@ -19,3 +21,21 @@ impl Keypair { self.pk.concatenated_hex_id() } } + +impl Hash for Keypair { + /// Note: this is distinct from consensus serialization, it will produce a different hash. + /// + /// This method uses the uncompressed bytes, which are much faster to obtain than the + /// compressed bytes required for consensus serialization. + /// + /// Use `ssz::Encode` to obtain the bytes required for consensus hashing. + fn hash(&self, state: &mut H) { + self.pk.as_uncompressed_bytes().hash(state) + } +} + +impl fmt::Display for Keypair { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.pk) + } +} diff --git a/eth2/utils/bls/src/public_key.rs b/eth2/utils/bls/src/public_key.rs index 5a348f530..5c4c3204c 100644 --- a/eth2/utils/bls/src/public_key.rs +++ b/eth2/utils/bls/src/public_key.rs @@ -7,6 +7,7 @@ use ssz::{ decode_ssz_list, hash, ssz_encode, Decodable, DecodeError, Encodable, SszStream, TreeHash, }; use std::default; +use std::fmt; use std::hash::{Hash, Hasher}; /// A single BLS signature. @@ -54,6 +55,12 @@ impl PublicKey { } } +impl fmt::Display for PublicKey { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.concatenated_hex_id()) + } +} + impl default::Default for PublicKey { fn default() -> Self { let secret_key = SecretKey::random(); diff --git a/protos/src/services.proto b/protos/src/services.proto index dd82855a1..ecc75ee26 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -19,7 +19,9 @@ service BeaconNodeService { /// Service that handles block production service BeaconBlockService { + // Requests a block to be signed from the beacon node. rpc ProduceBeaconBlock(ProduceBeaconBlockRequest) returns (ProduceBeaconBlockResponse); + // Responds to the node the signed block to be published. rpc PublishBeaconBlock(PublishBeaconBlockRequest) returns (PublishBeaconBlockResponse); } @@ -33,7 +35,7 @@ service ValidatorService { /// Service that handles validator attestations service AttestationService { - rpc ProduceAttestation(ProduceAttestationRequest) returns (ProduceAttestationResponse); + rpc ProduceAttestationData(ProduceAttestationDataRequest) returns (ProduceAttestationDataResponse); rpc PublishAttestation(PublishAttestationRequest) returns (PublishAttestationResponse); } @@ -64,6 +66,7 @@ message Empty {} // Validator requests an unsigned proposal. message ProduceBeaconBlockRequest { uint64 slot = 1; + bytes randao_reveal = 2; } // Beacon node returns an unsigned proposal. @@ -122,23 +125,28 @@ message ValidatorDuty { uint64 attestation_slot = 3; uint64 attestation_shard = 4; uint64 committee_index = 5; + uint64 committee_len = 6; } /* * Attestation Service Messages */ -message ProduceAttestationRequest { +message ProduceAttestationDataRequest { uint64 slot = 1; uint64 shard = 2; } -message ProduceAttestationResponse { - Attestation attestation_data = 1; +message ProduceAttestationDataResponse { + AttestationData attestation_data = 1; } message PublishAttestationRequest { - FreeAttestation free_attestation = 1; + Attestation attestation = 1; +} + +message Attestation { + bytes ssz = 1; } message PublishAttestationResponse { @@ -146,26 +154,6 @@ message PublishAttestationResponse { bytes msg = 2; } -message Crosslink { - uint64 epoch = 1; - bytes crosslink_data_root = 2; - -} - -message Attestation { - uint64 slot = 1; - uint64 shard = 2; - bytes beacon_block_root = 3; - bytes epoch_boundary_root = 4; - bytes crosslink_data_root = 5; - Crosslink latest_crosslink = 6; - uint64 justified_epoch = 7; - bytes justified_block_root = 8; - -} - -message FreeAttestation { - Attestation attestation_data = 1; - bytes signature = 2; - uint64 validator_index = 3; +message AttestationData { + bytes ssz = 1; } diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 570e06d74..80477c8ea 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -12,7 +12,6 @@ path = "src/main.rs" name = "validator_client" path = "src/lib.rs" - [dependencies] block_proposer = { path = "../eth2/block_proposer" } attester = { path = "../eth2/attester" } diff --git a/validator_client/src/attestation_producer/beacon_node_attestation.rs b/validator_client/src/attestation_producer/beacon_node_attestation.rs new file mode 100644 index 000000000..b5ff777de --- /dev/null +++ b/validator_client/src/attestation_producer/beacon_node_attestation.rs @@ -0,0 +1,23 @@ +//TODO: generalise these enums to the crate +use crate::block_producer::{BeaconNodeError, PublishOutcome}; +use types::{Attestation, AttestationData, Slot}; + +/// Defines the methods required to produce and publish attestations on a Beacon Node. Abstracts the +/// actual beacon node. +pub trait BeaconNodeAttestation: Send + Sync { + /// Request that the node produces the required attestation data. + /// + fn produce_attestation_data( + &self, + slot: Slot, + shard: u64, + ) -> Result; + + /// Request that the node publishes a attestation. + /// + /// Returns `true` if the publish was successful. + fn publish_attestation( + &self, + attestation: Attestation, + ) -> Result; +} diff --git a/validator_client/src/attestation_producer/grpc.rs b/validator_client/src/attestation_producer/grpc.rs new file mode 100644 index 000000000..900a92f32 --- /dev/null +++ b/validator_client/src/attestation_producer/grpc.rs @@ -0,0 +1,57 @@ +use super::beacon_node_attestation::BeaconNodeAttestation; +use crate::block_producer::{BeaconNodeError, PublishOutcome}; +use protos::services_grpc::AttestationServiceClient; +use ssz::{ssz_encode, Decodable}; + +use protos::services::{ + Attestation as GrpcAttestation, ProduceAttestationDataRequest, PublishAttestationRequest, +}; +use types::{Attestation, AttestationData, Slot}; + +impl BeaconNodeAttestation for AttestationServiceClient { + fn produce_attestation_data( + &self, + slot: Slot, + shard: u64, + ) -> Result { + let mut req = ProduceAttestationDataRequest::new(); + req.set_slot(slot.as_u64()); + req.set_shard(shard); + + let reply = self + .produce_attestation_data(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + let (attestation_data, _index) = + AttestationData::ssz_decode(reply.get_attestation_data().get_ssz(), 0) + .map_err(|_| BeaconNodeError::DecodeFailure)?; + Ok(attestation_data) + } + + fn publish_attestation( + &self, + attestation: Attestation, + ) -> Result { + let mut req = PublishAttestationRequest::new(); + + let ssz = ssz_encode(&attestation); + + let mut grpc_attestation = GrpcAttestation::new(); + grpc_attestation.set_ssz(ssz); + + req.set_attestation(grpc_attestation); + + let reply = self + .publish_attestation(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + if reply.get_success() { + Ok(PublishOutcome::Valid) + } else { + // TODO: distinguish between different errors + Ok(PublishOutcome::InvalidAttestation( + "Publish failed".to_string(), + )) + } + } +} diff --git a/validator_client/src/attestation_producer/mod.rs b/validator_client/src/attestation_producer/mod.rs new file mode 100644 index 000000000..0fbc7bcba --- /dev/null +++ b/validator_client/src/attestation_producer/mod.rs @@ -0,0 +1,165 @@ +mod beacon_node_attestation; +mod grpc; + +use std::sync::Arc; +use types::{ChainSpec, Domain, Fork}; +//TODO: Move these higher up in the crate +use super::block_producer::{BeaconNodeError, PublishOutcome, ValidatorEvent}; +use crate::signer::Signer; +use beacon_node_attestation::BeaconNodeAttestation; +use slog::{error, info, warn}; +use ssz::TreeHash; +use types::{ + AggregateSignature, Attestation, AttestationData, AttestationDataAndCustodyBit, + AttestationDuty, Bitfield, +}; + +//TODO: Group these errors at a crate level +#[derive(Debug, PartialEq)] +pub enum Error { + BeaconNodeError(BeaconNodeError), +} + +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) + } +} + +/// This struct contains the logic for requesting and signing beacon attestations for a validator. The +/// validator can abstractly sign via the Signer trait object. +pub struct AttestationProducer<'a, B: BeaconNodeAttestation, S: Signer> { + /// The current fork. + pub fork: Fork, + /// The attestation duty to perform. + pub duty: AttestationDuty, + /// The current epoch. + pub spec: Arc, + /// The beacon node to connect to. + pub beacon_node: Arc, + /// The signer to sign the block. + pub signer: &'a S, +} + +impl<'a, B: BeaconNodeAttestation, S: Signer> AttestationProducer<'a, B, S> { + /// Handle outputs and results from attestation production. + pub fn handle_produce_attestation(&mut self, log: slog::Logger) { + match self.produce_attestation() { + Ok(ValidatorEvent::AttestationProduced(_slot)) => { + info!(log, "Attestation produced"; "Validator" => format!("{}", self.signer)) + } + Err(e) => error!(log, "Attestation production error"; "Error" => format!("{:?}", e)), + Ok(ValidatorEvent::SignerRejection(_slot)) => { + error!(log, "Attestation production error"; "Error" => "Signer could not sign the attestation".to_string()) + } + Ok(ValidatorEvent::SlashableAttestationNotProduced(_slot)) => { + error!(log, "Attestation production error"; "Error" => "Rejected the attestation as it could have been slashed".to_string()) + } + Ok(ValidatorEvent::PublishAttestationFailed) => { + error!(log, "Attestation production error"; "Error" => "Beacon node was unable to publish an attestation".to_string()) + } + Ok(ValidatorEvent::InvalidAttestation) => { + error!(log, "Attestation production error"; "Error" => "The signed attestation was invalid".to_string()) + } + Ok(v) => { + warn!(log, "Unknown result for attestation production"; "Error" => format!("{:?}",v)) + } + } + } + + /// Produce an attestation, sign it and send it back + /// + /// Assumes that an attestation is required at this slot (does not check the duties). + /// + /// Ensures the message is not slashable. + /// + /// !!! UNSAFE !!! + /// + /// The slash-protection code is not yet implemented. There is zero protection against + /// slashing. + pub fn produce_attestation(&mut self) -> Result { + let epoch = self.duty.slot.epoch(self.spec.slots_per_epoch); + + let attestation = self + .beacon_node + .produce_attestation_data(self.duty.slot, self.duty.shard)?; + if self.safe_to_produce(&attestation) { + let domain = self.spec.get_domain(epoch, Domain::Attestation, &self.fork); + if let Some(attestation) = self.sign_attestation(attestation, self.duty, domain) { + match self.beacon_node.publish_attestation(attestation) { + Ok(PublishOutcome::InvalidAttestation(_string)) => { + Ok(ValidatorEvent::InvalidAttestation) + } + Ok(PublishOutcome::Valid) => { + Ok(ValidatorEvent::AttestationProduced(self.duty.slot)) + } + Err(_) | Ok(_) => Ok(ValidatorEvent::PublishAttestationFailed), + } + } else { + Ok(ValidatorEvent::SignerRejection(self.duty.slot)) + } + } else { + Ok(ValidatorEvent::SlashableAttestationNotProduced( + self.duty.slot, + )) + } + } + + /// Consumes an attestation, returning the attestation signed by the validators private key. + /// + /// Important: this function will not check to ensure the attestation is not slashable. This must be + /// done upstream. + fn sign_attestation( + &mut self, + attestation: AttestationData, + duties: AttestationDuty, + domain: u64, + ) -> Option { + self.store_produce(&attestation); + + // build the aggregate signature + let aggregate_signature = { + let message = AttestationDataAndCustodyBit { + data: attestation.clone(), + custody_bit: false, + } + .hash_tree_root(); + + let sig = self.signer.sign_message(&message, domain)?; + + let mut agg_sig = AggregateSignature::new(); + agg_sig.add(&sig); + agg_sig + }; + + let mut aggregation_bitfield = Bitfield::with_capacity(duties.committee_len); + let custody_bitfield = Bitfield::with_capacity(duties.committee_len); + aggregation_bitfield.set(duties.committee_index, true); + + Some(Attestation { + aggregation_bitfield, + data: attestation, + custody_bitfield, + aggregate_signature, + }) + } + + /// Returns `true` if signing an attestation is safe (non-slashable). + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn safe_to_produce(&self, _attestation: &AttestationData) -> bool { + //TODO: Implement slash protection + true + } + + /// Record that an attestation was produced so that slashable votes may not be made in the future. + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn store_produce(&mut self, _attestation: &AttestationData) { + // TODO: Implement slash protection + } +} diff --git a/validator_client/src/attester_service/attestation_grpc_client.rs b/validator_client/src/attester_service/attestation_grpc_client.rs deleted file mode 100644 index 5a4701ba9..000000000 --- a/validator_client/src/attester_service/attestation_grpc_client.rs +++ /dev/null @@ -1,44 +0,0 @@ -use protos::services_grpc::AttestationServiceClient; -use std::sync::Arc; - -use attester::{BeaconNode, BeaconNodeError, PublishOutcome}; -use protos::services::ProduceAttestationRequest; -use types::{AttestationData, FreeAttestation, Slot}; - -pub struct AttestationGrpcClient { - client: Arc, -} - -impl AttestationGrpcClient { - pub fn new(client: Arc) -> Self { - Self { client } - } -} - -impl BeaconNode for AttestationGrpcClient { - fn produce_attestation( - &self, - slot: Slot, - shard: u64, - ) -> Result, BeaconNodeError> { - let mut req = ProduceAttestationRequest::new(); - req.set_slot(slot.as_u64()); - req.set_shard(shard); - - let reply = self - .client - .produce_attestation(&req) - .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; - - // TODO: return correct Attestation - Err(BeaconNodeError::DecodeFailure) - } - - fn publish_attestation( - &self, - free_attestation: FreeAttestation, - ) -> Result { - // TODO: return correct PublishOutcome - Err(BeaconNodeError::DecodeFailure) - } -} diff --git a/validator_client/src/attester_service/mod.rs b/validator_client/src/attester_service/mod.rs deleted file mode 100644 index fe5de7647..000000000 --- a/validator_client/src/attester_service/mod.rs +++ /dev/null @@ -1,54 +0,0 @@ -mod attestation_grpc_client; -use attester::{Attester, BeaconNode, DutiesReader, PollOutcome as AttesterPollOutcome, Signer}; -use slog::{error, info, warn, Logger}; -use slot_clock::SlotClock; -use std::time::Duration; - -pub use self::attestation_grpc_client::AttestationGrpcClient; - -pub struct AttesterService { - pub attester: Attester, - pub poll_interval_millis: u64, - pub log: Logger, -} - -impl AttesterService { - /// Run a loop which polls the Attester each `poll_interval_millis` millseconds. - /// - /// Logs the results of the polls. - pub fn run(&mut self) { - loop { - match self.attester.poll() { - Err(error) => { - error!(self.log, "Attester poll error"; "error" => format!("{:?}", error)) - } - Ok(AttesterPollOutcome::AttestationProduced(slot)) => { - info!(self.log, "Produced Attestation"; "slot" => slot) - } - Ok(AttesterPollOutcome::SlashableAttestationNotProduced(slot)) => { - warn!(self.log, "Slashable attestation was not produced"; "slot" => slot) - } - Ok(AttesterPollOutcome::AttestationNotRequired(slot)) => { - info!(self.log, "Attestation not required"; "slot" => slot) - } - Ok(AttesterPollOutcome::ProducerDutiesUnknown(slot)) => { - error!(self.log, "Attestation duties unknown"; "slot" => slot) - } - Ok(AttesterPollOutcome::SlotAlreadyProcessed(slot)) => { - warn!(self.log, "Attempted to re-process slot"; "slot" => slot) - } - Ok(AttesterPollOutcome::BeaconNodeUnableToProduceAttestation(slot)) => { - error!(self.log, "Beacon node unable to produce attestation"; "slot" => slot) - } - Ok(AttesterPollOutcome::SignerRejection(slot)) => { - error!(self.log, "The cryptographic signer refused to sign the attestation"; "slot" => slot) - } - Ok(AttesterPollOutcome::ValidatorIsUnknown(slot)) => { - error!(self.log, "The Beacon Node does not recognise the validator"; "slot" => slot) - } - }; - - std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); - } - } -} diff --git a/validator_client/src/block_producer/beacon_node_block.rs b/validator_client/src/block_producer/beacon_node_block.rs new file mode 100644 index 000000000..65ccb2104 --- /dev/null +++ b/validator_client/src/block_producer/beacon_node_block.rs @@ -0,0 +1,31 @@ +use types::{BeaconBlock, Signature, Slot}; +#[derive(Debug, PartialEq, Clone)] +pub enum BeaconNodeError { + RemoteFailure(String), + DecodeFailure, +} + +#[derive(Debug, PartialEq, Clone)] +pub enum PublishOutcome { + Valid, + InvalidBlock(String), + InvalidAttestation(String), +} + +/// Defines the methods required to produce and publish blocks on a Beacon Node. Abstracts the +/// actual beacon node. +pub trait BeaconNodeBlock: Send + Sync { + /// Request that the node produces a block. + /// + /// Returns Ok(None) if the Beacon Node is unable to produce at the given slot. + fn produce_beacon_block( + &self, + slot: Slot, + randao_reveal: &Signature, + ) -> Result, BeaconNodeError>; + + /// Request that the node publishes a block. + /// + /// Returns `true` if the publish was successful. + fn publish_beacon_block(&self, block: BeaconBlock) -> Result; +} diff --git a/validator_client/src/block_producer_service/beacon_block_grpc_client.rs b/validator_client/src/block_producer/grpc.rs similarity index 86% rename from validator_client/src/block_producer_service/beacon_block_grpc_client.rs rename to validator_client/src/block_producer/grpc.rs index ba2acfffb..fd1a70fa2 100644 --- a/validator_client/src/block_producer_service/beacon_block_grpc_client.rs +++ b/validator_client/src/block_producer/grpc.rs @@ -1,4 +1,4 @@ -use block_proposer::{BeaconNode, BeaconNodeError, PublishOutcome}; +use super::beacon_node_block::*; use protos::services::{ BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest, }; @@ -7,6 +7,7 @@ use ssz::{ssz_encode, Decodable}; use std::sync::Arc; use types::{BeaconBlock, Signature, Slot}; +//TODO: Remove this new type. Do not need to wrap /// A newtype designed to wrap the gRPC-generated service so the `BeaconNode` trait may be /// implemented upon it. pub struct BeaconBlockGrpcClient { @@ -19,7 +20,7 @@ impl BeaconBlockGrpcClient { } } -impl BeaconNode for BeaconBlockGrpcClient { +impl BeaconNodeBlock for BeaconBlockGrpcClient { /// Request a Beacon Node (BN) to produce a new block at the supplied slot. /// /// Returns `None` if it is not possible to produce at the supplied slot. For example, if the @@ -27,17 +28,20 @@ impl BeaconNode for BeaconBlockGrpcClient { fn produce_beacon_block( &self, slot: Slot, - // TODO: use randao_reveal, when proto APIs have been updated. - _randao_reveal: &Signature, + randao_reveal: &Signature, ) -> Result, BeaconNodeError> { + // request a beacon block from the node let mut req = ProduceBeaconBlockRequest::new(); req.set_slot(slot.as_u64()); + req.set_randao_reveal(ssz_encode(randao_reveal)); + //TODO: Determine if we want an explicit timeout let reply = self .client .produce_beacon_block(&req) .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + // format the reply if reply.has_block() { let block = reply.get_block(); let ssz = block.get_ssz(); @@ -60,7 +64,6 @@ impl BeaconNode for BeaconBlockGrpcClient { let ssz = ssz_encode(&block); - // TODO: this conversion is incomplete; fix it. let mut grpc_block = GrpcBeaconBlock::new(); grpc_block.set_ssz(ssz); @@ -72,7 +75,7 @@ impl BeaconNode for BeaconBlockGrpcClient { .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; if reply.get_success() { - Ok(PublishOutcome::ValidBlock) + Ok(PublishOutcome::Valid) } else { // TODO: distinguish between different errors Ok(PublishOutcome::InvalidBlock("Publish failed".to_string())) diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs new file mode 100644 index 000000000..8b4f5abda --- /dev/null +++ b/validator_client/src/block_producer/mod.rs @@ -0,0 +1,242 @@ +mod beacon_node_block; +mod grpc; + +use self::beacon_node_block::BeaconNodeBlock; +pub use self::beacon_node_block::{BeaconNodeError, PublishOutcome}; +pub use self::grpc::BeaconBlockGrpcClient; +use crate::signer::Signer; +use slog::{error, info, warn}; +use ssz::{SignedRoot, TreeHash}; +use std::sync::Arc; +use types::{BeaconBlock, ChainSpec, Domain, Fork, Slot}; + +#[derive(Debug, PartialEq)] +pub enum Error { + BeaconNodeError(BeaconNodeError), +} + +#[derive(Debug, PartialEq)] +pub enum ValidatorEvent { + /// A new block was produced. + BlockProduced(Slot), + /// A new attestation was produced. + AttestationProduced(Slot), + /// A block was not produced as it would have been slashable. + SlashableBlockNotProduced(Slot), + /// An attestation was not produced as it would have been slashable. + SlashableAttestationNotProduced(Slot), + /// The Beacon Node was unable to produce a block at that slot. + BeaconNodeUnableToProduceBlock(Slot), + /// The signer failed to sign the message. + SignerRejection(Slot), + /// Publishing an attestation failed. + PublishAttestationFailed, + /// Beacon node rejected the attestation. + InvalidAttestation, +} + +/// This struct contains the logic for requesting and signing beacon blocks for a validator. The +/// validator can abstractly sign via the Signer trait object. +pub struct BlockProducer<'a, B: BeaconNodeBlock, S: Signer> { + /// The current fork. + pub fork: Fork, + /// The current slot to produce a block for. + pub slot: Slot, + /// The current epoch. + pub spec: Arc, + /// The beacon node to connect to. + pub beacon_node: Arc, + /// The signer to sign the block. + pub signer: &'a S, +} + +impl<'a, B: BeaconNodeBlock, S: Signer> BlockProducer<'a, B, S> { + /// Handle outputs and results from block production. + pub fn handle_produce_block(&mut self, log: slog::Logger) { + match self.produce_block() { + Ok(ValidatorEvent::BlockProduced(_slot)) => { + info!(log, "Block produced"; "Validator" => format!("{}", self.signer)) + } + Err(e) => error!(log, "Block production error"; "Error" => format!("{:?}", e)), + Ok(ValidatorEvent::SignerRejection(_slot)) => { + error!(log, "Block production error"; "Error" => "Signer Could not sign the block".to_string()) + } + Ok(ValidatorEvent::SlashableBlockNotProduced(_slot)) => { + error!(log, "Block production error"; "Error" => "Rejected the block as it could have been slashed".to_string()) + } + Ok(ValidatorEvent::BeaconNodeUnableToProduceBlock(_slot)) => { + error!(log, "Block production error"; "Error" => "Beacon node was unable to produce a block".to_string()) + } + Ok(v) => { + warn!(log, "Unknown result for block production"; "Error" => format!("{:?}",v)) + } + } + } + + /// Produce a block at some slot. + /// + /// Assumes that a block is required at this slot (does not check the duties). + /// + /// Ensures the message is not slashable. + /// + /// !!! UNSAFE !!! + /// + /// The slash-protection code is not yet implemented. There is zero protection against + /// slashing. + pub fn produce_block(&mut self) -> Result { + let epoch = self.slot.epoch(self.spec.slots_per_epoch); + + let message = epoch.hash_tree_root(); + let randao_reveal = match self.signer.sign_message( + &message, + self.spec.get_domain(epoch, Domain::Randao, &self.fork), + ) { + None => return Ok(ValidatorEvent::SignerRejection(self.slot)), + Some(signature) => signature, + }; + + if let Some(block) = self + .beacon_node + .produce_beacon_block(self.slot, &randao_reveal)? + { + if self.safe_to_produce(&block) { + let domain = self.spec.get_domain(epoch, Domain::BeaconBlock, &self.fork); + if let Some(block) = self.sign_block(block, domain) { + self.beacon_node.publish_beacon_block(block)?; + Ok(ValidatorEvent::BlockProduced(self.slot)) + } else { + Ok(ValidatorEvent::SignerRejection(self.slot)) + } + } else { + Ok(ValidatorEvent::SlashableBlockNotProduced(self.slot)) + } + } else { + Ok(ValidatorEvent::BeaconNodeUnableToProduceBlock(self.slot)) + } + } + + /// Consumes a block, returning that block signed by the validators private key. + /// + /// Important: this function will not check to ensure the block is not slashable. This must be + /// done upstream. + fn sign_block(&mut self, mut block: BeaconBlock, domain: u64) -> Option { + self.store_produce(&block); + + match self.signer.sign_message(&block.signed_root()[..], domain) { + None => None, + Some(signature) => { + block.signature = signature; + Some(block) + } + } + } + + /// Returns `true` if signing a block is safe (non-slashable). + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn safe_to_produce(&self, _block: &BeaconBlock) -> bool { + // TODO: ensure the producer doesn't produce slashable blocks. + // https://github.com/sigp/lighthouse/issues/160 + true + } + + /// Record that a block was produced so that slashable votes may not be made in the future. + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn store_produce(&mut self, _block: &BeaconBlock) { + // TODO: record this block production to prevent future slashings. + // https://github.com/sigp/lighthouse/issues/160 + } +} + +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) + } +} + +/* Old tests - Re-work for new logic +#[cfg(test)] +mod tests { + use super::test_utils::{EpochMap, LocalSigner, SimulatedBeaconNode}; + use super::*; + use slot_clock::TestingSlotClock; + use types::{ + test_utils::{SeedableRng, TestRandom, XorShiftRng}, + Keypair, + }; + + // TODO: implement more thorough testing. + // https://github.com/sigp/lighthouse/issues/160 + // + // These tests should serve as a good example for future tests. + + #[test] + pub fn polling() { + let mut rng = XorShiftRng::from_seed([42; 16]); + + let spec = Arc::new(ChainSpec::foundation()); + let slot_clock = Arc::new(TestingSlotClock::new(0)); + let beacon_node = Arc::new(SimulatedBeaconNode::default()); + let signer = Arc::new(LocalSigner::new(Keypair::random())); + + let mut epoch_map = EpochMap::new(spec.slots_per_epoch); + let produce_slot = Slot::new(100); + let produce_epoch = produce_slot.epoch(spec.slots_per_epoch); + epoch_map.map.insert(produce_epoch, produce_slot); + let epoch_map = Arc::new(epoch_map); + + let mut block_proposer = BlockProducer::new( + spec.clone(), + epoch_map.clone(), + slot_clock.clone(), + beacon_node.clone(), + signer.clone(), + ); + + // Configure responses from the BeaconNode. + beacon_node.set_next_produce_result(Ok(Some(BeaconBlock::random_for_test(&mut rng)))); + beacon_node.set_next_publish_result(Ok(PublishOutcome::ValidBlock)); + + // One slot before production slot... + slot_clock.set_slot(produce_slot.as_u64() - 1); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::BlockProductionNotRequired(produce_slot - 1)) + ); + + // On the produce slot... + slot_clock.set_slot(produce_slot.as_u64()); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::BlockProduced(produce_slot.into())) + ); + + // Trying the same produce slot again... + slot_clock.set_slot(produce_slot.as_u64()); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::SlotAlreadyProcessed(produce_slot)) + ); + + // One slot after the produce slot... + slot_clock.set_slot(produce_slot.as_u64() + 1); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::BlockProductionNotRequired(produce_slot + 1)) + ); + + // In an epoch without known duties... + let slot = (produce_epoch.as_u64() + 1) * spec.slots_per_epoch; + slot_clock.set_slot(slot); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::ProducerDutiesUnknown(Slot::new(slot))) + ); + } +} +*/ diff --git a/validator_client/src/block_producer_service/mod.rs b/validator_client/src/block_producer_service/mod.rs deleted file mode 100644 index 91e7606a7..000000000 --- a/validator_client/src/block_producer_service/mod.rs +++ /dev/null @@ -1,61 +0,0 @@ -mod beacon_block_grpc_client; -// mod block_producer_service; - -use block_proposer::{ - BeaconNode, BlockProducer, DutiesReader, PollOutcome as BlockProducerPollOutcome, Signer, -}; -use slog::{error, info, warn, Logger}; -use slot_clock::SlotClock; -use std::time::Duration; - -pub use self::beacon_block_grpc_client::BeaconBlockGrpcClient; - -pub struct BlockProducerService { - pub block_producer: BlockProducer, - pub poll_interval_millis: u64, - pub log: Logger, -} - -impl BlockProducerService { - /// Run a loop which polls the block producer each `poll_interval_millis` millseconds. - /// - /// Logs the results of the polls. - pub fn run(&mut self) { - loop { - match self.block_producer.poll() { - Err(error) => { - error!(self.log, "Block producer poll error"; "error" => format!("{:?}", error)) - } - Ok(BlockProducerPollOutcome::BlockProduced(slot)) => { - info!(self.log, "Produced block"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::SlashableBlockNotProduced(slot)) => { - warn!(self.log, "Slashable block was not signed"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::BlockProductionNotRequired(slot)) => { - info!(self.log, "Block production not required"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::ProducerDutiesUnknown(slot)) => { - error!(self.log, "Block production duties unknown"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::SlotAlreadyProcessed(slot)) => { - warn!(self.log, "Attempted to re-process slot"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock(slot)) => { - error!(self.log, "Beacon node unable to produce block"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::SignerRejection(slot)) => { - error!(self.log, "The cryptographic signer refused to sign the block"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::ValidatorIsUnknown(slot)) => { - error!(self.log, "The Beacon Node does not recognise the validator"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::UnableToGetFork(slot)) => { - error!(self.log, "Unable to get a `Fork` struct to generate signature domains"; "slot" => slot) - } - }; - - std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); - } - } -} diff --git a/validator_client/src/duties/traits.rs b/validator_client/src/duties/beacon_node_duties.rs similarity index 74% rename from validator_client/src/duties/traits.rs rename to validator_client/src/duties/beacon_node_duties.rs index 374bed9f6..af1fab60b 100644 --- a/validator_client/src/duties/traits.rs +++ b/validator_client/src/duties/beacon_node_duties.rs @@ -2,12 +2,12 @@ use super::EpochDuties; use types::{Epoch, PublicKey}; #[derive(Debug, PartialEq, Clone)] -pub enum BeaconNodeError { +pub enum BeaconNodeDutiesError { RemoteFailure(String), } /// Defines the methods required to obtain a validators shuffling from a Beacon Node. -pub trait BeaconNode: Send + Sync { +pub trait BeaconNodeDuties: Send + Sync { /// Gets the duties for all validators. /// /// Returns a vector of EpochDuties for each validator public key. The entry will be None for @@ -15,6 +15,6 @@ pub trait BeaconNode: Send + Sync { fn request_duties( &self, epoch: Epoch, - pubkeys: &[PublicKey], - ) -> Result; + pub_keys: &[PublicKey], + ) -> Result; } diff --git a/validator_client/src/duties/epoch_duties.rs b/validator_client/src/duties/epoch_duties.rs index 5c23e82b1..692a8d6a6 100644 --- a/validator_client/src/duties/epoch_duties.rs +++ b/validator_client/src/duties/epoch_duties.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fmt; use std::ops::{Deref, DerefMut}; use types::{AttestationDuty, Epoch, PublicKey, Slot}; @@ -22,9 +23,7 @@ pub struct WorkInfo { #[derive(Debug, PartialEq, Clone, Copy, Default)] pub struct EpochDuty { pub block_production_slot: Option, - pub attestation_slot: Slot, - pub attestation_shard: u64, - pub committee_index: u64, + pub attestation_duty: AttestationDuty, } impl EpochDuty { @@ -38,12 +37,8 @@ impl EpochDuty { // if the validator is required to attest to a shard, create the data let mut attestation_duty = None; - if self.attestation_slot == slot { - attestation_duty = Some(AttestationDuty { - slot, - shard: self.attestation_shard, - committee_index: self.committee_index as usize, - }); + if self.attestation_duty.slot == slot { + attestation_duty = Some(self.attestation_duty) } if produce_block | attestation_duty.is_some() { @@ -55,11 +50,25 @@ impl EpochDuty { None } } -/// Maps a list of public keys (many validators) to an EpochDuty. + +impl fmt::Display for EpochDuty { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut display_block = String::from("None"); + if let Some(block_slot) = self.block_production_slot { + display_block = block_slot.to_string(); + } + write!( + f, + "produce block slot: {}, attestation slot: {}, attestation shard: {}", + display_block, self.attestation_duty.slot, self.attestation_duty.shard + ) + } +} + +/// Maps a list of keypairs (many validators) to an EpochDuty. pub type EpochDuties = HashMap>; pub enum EpochDutiesMapError { - Poisoned, UnknownEpoch, UnknownValidator, } @@ -98,7 +107,7 @@ impl EpochDutiesMap { pub fn is_work_slot( &self, slot: Slot, - pubkey: &PublicKey, + signer: &PublicKey, ) -> Result, EpochDutiesMapError> { let epoch = slot.epoch(self.slots_per_epoch); @@ -106,7 +115,7 @@ impl EpochDutiesMap { .map .get(&epoch) .ok_or_else(|| EpochDutiesMapError::UnknownEpoch)?; - if let Some(epoch_duty) = epoch_duties.get(pubkey) { + if let Some(epoch_duty) = epoch_duties.get(signer) { if let Some(duty) = epoch_duty { // Retrieves the duty for a validator at a given slot return Ok(duty.is_work_slot(slot)); diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs index 511ffa34a..58fb5c992 100644 --- a/validator_client/src/duties/grpc.rs +++ b/validator_client/src/duties/grpc.rs @@ -1,26 +1,27 @@ +use super::beacon_node_duties::{BeaconNodeDuties, BeaconNodeDutiesError}; use super::epoch_duties::{EpochDuties, EpochDuty}; -use super::traits::{BeaconNode, BeaconNodeError}; -use grpcio::CallOption; +// to use if we manually specify a timeout +//use grpcio::CallOption; use protos::services::{GetDutiesRequest, Validators}; use protos::services_grpc::ValidatorServiceClient; use ssz::ssz_encode; use std::collections::HashMap; -use std::time::Duration; -use types::{Epoch, PublicKey, Slot}; +// use std::time::Duration; +use types::{AttestationDuty, Epoch, PublicKey, Slot}; -impl BeaconNode for ValidatorServiceClient { +impl BeaconNodeDuties for ValidatorServiceClient { /// Requests all duties (block signing and committee attesting) from the Beacon Node (BN). fn request_duties( &self, epoch: Epoch, - pubkeys: &[PublicKey], - ) -> Result { + pub_keys: &[PublicKey], + ) -> Result { // Get the required duties from all validators // build the request let mut req = GetDutiesRequest::new(); req.set_epoch(epoch.as_u64()); let mut validators = Validators::new(); - validators.set_public_keys(pubkeys.iter().map(|v| ssz_encode(v)).collect()); + validators.set_public_keys(pub_keys.iter().map(|v| ssz_encode(v)).collect()); req.set_validators(validators); // set a timeout for requests @@ -29,13 +30,13 @@ impl BeaconNode for ValidatorServiceClient { // send the request, get the duties reply let reply = self .get_validator_duties(&req) - .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + .map_err(|err| BeaconNodeDutiesError::RemoteFailure(format!("{:?}", err)))?; let mut epoch_duties: HashMap> = HashMap::new(); for (index, validator_duty) in reply.get_active_validators().iter().enumerate() { if !validator_duty.has_duty() { // validator is inactive - epoch_duties.insert(pubkeys[index].clone(), None); + epoch_duties.insert(pub_keys[index].clone(), None); continue; } // active validator @@ -47,13 +48,19 @@ impl BeaconNode for ValidatorServiceClient { None } }; + + let attestation_duty = AttestationDuty { + slot: Slot::from(active_duty.get_attestation_slot()), + shard: active_duty.get_attestation_shard(), + committee_index: active_duty.get_committee_index() as usize, + committee_len: active_duty.get_committee_len() as usize, + }; + let epoch_duty = EpochDuty { block_production_slot, - attestation_slot: Slot::from(active_duty.get_attestation_slot()), - attestation_shard: active_duty.get_attestation_shard(), - committee_index: active_duty.get_committee_index(), + attestation_duty, }; - epoch_duties.insert(pubkeys[index].clone(), Some(epoch_duty)); + epoch_duties.insert(pub_keys[index].clone(), Some(epoch_duty)); } Ok(epoch_duties) } diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index 0e962053e..7db4672e3 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -1,15 +1,17 @@ +mod beacon_node_duties; mod epoch_duties; mod grpc; // TODO: reintroduce tests //#[cfg(test)] //mod test_node; -mod traits; +pub use self::beacon_node_duties::{BeaconNodeDuties, BeaconNodeDutiesError}; use self::epoch_duties::{EpochDuties, EpochDutiesMapError}; pub use self::epoch_duties::{EpochDutiesMap, WorkInfo}; -use self::traits::{BeaconNode, BeaconNodeError}; +use super::signer::Signer; use futures::Async; use slog::{debug, error, info}; +use std::fmt::Display; use std::sync::Arc; use std::sync::RwLock; use types::{Epoch, PublicKey, Slot}; @@ -28,8 +30,7 @@ pub enum UpdateOutcome { #[derive(Debug, PartialEq)] pub enum Error { DutiesMapPoisoned, - EpochMapPoisoned, - BeaconNodeError(BeaconNodeError), + BeaconNodeDutiesError(BeaconNodeDutiesError), UnknownEpoch, UnknownValidator, } @@ -38,19 +39,20 @@ pub enum Error { /// Node. /// /// This keeps track of all validator keys and required voting slots. -pub struct DutiesManager { +pub struct DutiesManager { pub duties_map: RwLock, - /// A list of all public keys known to the validator service. - pub pubkeys: Vec, + /// A list of all signer objects known to the validator service. + pub signers: Arc>, pub beacon_node: Arc, } -impl DutiesManager { +impl DutiesManager { /// Check the Beacon Node for `EpochDuties`. /// /// be a wall-clock (e.g., system time, remote server time, etc.). fn update(&self, epoch: Epoch) -> Result { - let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?; + let public_keys: Vec = self.signers.iter().map(|s| s.to_public()).collect(); + let duties = self.beacon_node.request_duties(epoch, &public_keys)?; { // If these duties were known, check to see if they're updates or identical. if let Some(known_duties) = self.duties_map.read()?.get(&epoch) { @@ -67,7 +69,7 @@ impl DutiesManager { // duties have changed //TODO: Duties could be large here. Remove from display and avoid the clone. self.duties_map.write()?.insert(epoch, duties.clone()); - return Ok(UpdateOutcome::DutiesChanged(epoch, duties)); + Ok(UpdateOutcome::DutiesChanged(epoch, duties)) } /// A future wrapping around `update()`. This will perform logic based upon the update @@ -82,25 +84,27 @@ impl DutiesManager { info!(log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties)) } Ok(UpdateOutcome::NewDuties(epoch, duties)) => { - info!(log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + info!(log, "New duties obtained"; "epoch" => epoch); + print_duties(&log, duties); } }; Ok(Async::Ready(())) } - /// Returns a list of (Public, WorkInfo) indicating all the validators that have work to perform + /// Returns a list of (index, WorkInfo) indicating all the validators that have work to perform /// this slot. - pub fn get_current_work(&self, slot: Slot) -> Option> { - let mut current_work: Vec<(PublicKey, WorkInfo)> = Vec::new(); + pub fn get_current_work(&self, slot: Slot) -> Option> { + let mut current_work: Vec<(usize, WorkInfo)> = Vec::new(); // if the map is poisoned, return None let duties = self.duties_map.read().ok()?; - for validator_pk in &self.pubkeys { - match duties.is_work_slot(slot, &validator_pk) { - Ok(Some(work_type)) => current_work.push((validator_pk.clone(), work_type)), + for (index, validator_signer) in self.signers.iter().enumerate() { + match duties.is_work_slot(slot, &validator_signer.to_public()) { + Ok(Some(work_type)) => current_work.push((index, work_type)), Ok(None) => {} // No work for this validator - Err(_) => {} // Unknown epoch or validator, no work + //TODO: This should really log an error, as we shouldn't end up with an err here. + Err(_) => {} // Unknown epoch or validator, no work } } if current_work.is_empty() { @@ -111,9 +115,9 @@ impl DutiesManager { } //TODO: Use error_chain to handle errors -impl From for Error { - fn from(e: BeaconNodeError) -> Error { - Error::BeaconNodeError(e) +impl From for Error { + fn from(e: BeaconNodeDutiesError) -> Error { + Error::BeaconNodeDutiesError(e) } } @@ -126,13 +130,22 @@ impl From> for Error { impl From for Error { fn from(e: EpochDutiesMapError) -> Error { match e { - EpochDutiesMapError::Poisoned => Error::EpochMapPoisoned, EpochDutiesMapError::UnknownEpoch => Error::UnknownEpoch, EpochDutiesMapError::UnknownValidator => Error::UnknownValidator, } } } +fn print_duties(log: &slog::Logger, duties: EpochDuties) { + for (pk, duty) in duties.iter() { + if let Some(display_duty) = duty { + info!(log, "Validator: {}",pk; "Duty" => format!("{}",display_duty)); + } else { + info!(log, "Validator: {}",pk; "Duty" => "None"); + } + } +} + /* TODO: Modify tests for new Duties Manager form #[cfg(test)] mod tests { diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index d044030fe..7a353e0dc 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,14 +1,17 @@ -mod attester_service; -mod block_producer_service; +mod attestation_producer; +mod block_producer; mod config; mod duties; pub mod error; mod service; +mod signer; use crate::config::Config as ValidatorClientConfig; use clap::{App, Arg}; +use protos::services_grpc::ValidatorServiceClient; use service::Service as ValidatorService; use slog::{error, info, o, Drain}; +use types::Keypair; fn main() { // Logging @@ -52,7 +55,8 @@ fn main() { .expect("Unable to build a configuration for the validator client."); // start the validator service. - match ValidatorService::start(config, log.clone()) { + // this specifies the GRPC and signer type to use as the duty manager beacon node. + match ValidatorService::::start(config, log.clone()) { Ok(_) => info!(log, "Validator client shutdown successfully."), Err(e) => error!(log, "Validator exited due to: {}", e.to_string()), } diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 83e760855..38883e68f 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -1,14 +1,20 @@ -/// The validator service. Connects to a beacon node and signs blocks when required. -use crate::attester_service::{AttestationGrpcClient, AttesterService}; -use crate::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; +/// The Validator Client service. +/// +/// Connects to a beacon node and negotiates the correct chain id. +/// +/// Once connected, the service loads known validators keypairs from disk. Every slot, +/// the service pings the beacon node, asking for new duties for each of the validators. +/// +/// When a validator needs to either produce a block or sign an attestation, it requests the +/// data from the beacon node and performs the signing before publishing the block to the beacon +/// node. +use crate::attestation_producer::AttestationProducer; +use crate::block_producer::{BeaconBlockGrpcClient, BlockProducer}; use crate::config::Config as ValidatorConfig; -use crate::duties::UpdateOutcome; -use crate::duties::{DutiesManager, EpochDutiesMap}; +use crate::duties::{BeaconNodeDuties, DutiesManager, EpochDutiesMap}; use crate::error as error_chain; use crate::error::ErrorKind; -use attester::test_utils::EpochMap; -use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; -use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; +use crate::signer::Signer; use bls::Keypair; use grpcio::{ChannelBuilder, EnvBuilder}; use protos::services::Empty; @@ -16,65 +22,64 @@ use protos::services_grpc::{ AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, ValidatorServiceClient, }; -use slog::{debug, error, info, warn}; +use slog::{error, info, warn}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::sync::Arc; use std::sync::RwLock; use std::time::{Duration, Instant, SystemTime}; use tokio::prelude::*; use tokio::runtime::Builder; -use tokio::timer::Interval; +use tokio::timer::{Delay, Interval}; use tokio_timer::clock::Clock; use types::test_utils::generate_deterministic_keypairs; -use types::{Epoch, Fork, Slot}; +use types::{ChainSpec, Epoch, Fork, Slot}; -//TODO: This service should be simplified in the future. Can be made more steamlined. +/// A fixed amount of time after a slot to perform operations. This gives the node time to complete +/// per-slot processes. +const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(200); /// The validator service. This is the main thread that executes and maintains validator /// duties. -pub struct Service { - /// The node we currently connected to. - connected_node_version: String, - /// The chain id we are processing on. - chain_id: u16, - /// The fork state we processing on. +//TODO: Generalize the BeaconNode types to use testing +pub struct Service { + /// The node's current fork version we are processing on. fork: Fork, /// The slot clock for this service. slot_clock: SystemTimeSlotClock, /// The current slot we are processing. current_slot: Slot, - /// The number of slots per epoch to allow for converting slots to epochs. - slots_per_epoch: u64, + /// The chain specification for this clients instance. + spec: Arc, + /// The duties manager which maintains the state of when to perform actions. + duties_manager: Arc>, // GRPC Clients /// The beacon block GRPC client. - beacon_block_client: Arc, - /// The validator GRPC client. - validator_client: Arc, + beacon_block_client: Arc, /// The attester GRPC client. - attester_client: Arc, + attestation_client: Arc, /// The validator client logger. log: slog::Logger, } -impl Service { +impl Service { /// Initial connection to the beacon node to determine its properties. /// /// This tries to connect to a beacon node. Once connected, it initialised the gRPC clients /// and returns an instance of the service. fn initialize_service( - config: &ValidatorConfig, + config: ValidatorConfig, log: slog::Logger, - ) -> error_chain::Result { + ) -> error_chain::Result> { // initialise the beacon node client to check for a connection let env = Arc::new(EnvBuilder::new().build()); // Beacon node gRPC beacon node endpoints. let beacon_node_client = { let ch = ChannelBuilder::new(env.clone()).connect(&config.server); - Arc::new(BeaconNodeServiceClient::new(ch)) + BeaconNodeServiceClient::new(ch) }; - // retrieve node information + // retrieve node information and validate the beacon node let node_info = loop { match beacon_node_client.info(&Empty::new()) { Err(e) => { @@ -84,18 +89,27 @@ impl Service { continue; } Ok(info) => { + // verify the node's genesis time if SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs() < info.genesis_time { - warn!( + error!( log, "Beacon Node's genesis time is in the future. No work to do.\n Exiting" ); return Err("Genesis time in the future".into()); } + // verify the node's chain id + if config.spec.chain_id != info.chain_id as u8 { + error!( + log, + "Beacon Node's genesis time is in the future. No work to do.\n Exiting" + ); + return Err(format!("Beacon node has the wrong chain id. Expected chain id: {}, node's chain id: {}", config.spec.chain_id, info.chain_id).into()); + } break info; } }; @@ -123,7 +137,9 @@ impl Service { // Beacon node gRPC beacon block endpoints. let beacon_block_client = { let ch = ChannelBuilder::new(env.clone()).connect(&config.server); - Arc::new(BeaconBlockServiceClient::new(ch)) + let beacon_block_service_client = Arc::new(BeaconBlockServiceClient::new(ch)); + // a wrapper around the service client to implement the beacon block node trait + Arc::new(BeaconBlockGrpcClient::new(beacon_block_service_client)) }; // Beacon node gRPC validator endpoints. @@ -133,7 +149,7 @@ impl Service { }; //Beacon node gRPC attester endpoints. - let attester_client = { + let attestation_client = { let ch = ChannelBuilder::new(env.clone()).connect(&config.server); Arc::new(AttestationServiceClient::new(ch)) }; @@ -145,27 +161,51 @@ impl Service { let current_slot = slot_clock .present_slot() - .map_err(|e| ErrorKind::SlotClockError(e))? + .map_err(ErrorKind::SlotClockError)? .expect("Genesis must be in the future"); - Ok(Self { - connected_node_version: node_info.version, - chain_id: node_info.chain_id as u16, + /* Generate the duties manager */ + + // generate keypairs + + // TODO: keypairs are randomly generated; they should be loaded from a file or generated. + // https://github.com/sigp/lighthouse/issues/160 + let keypairs = Arc::new(generate_deterministic_keypairs(8)); + + // Builds a mapping of Epoch -> Map(PublicKey, EpochDuty) + // where EpochDuty contains slot numbers and attestation data that each validator needs to + // produce work on. + let duties_map = RwLock::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); + + // builds a manager which maintains the list of current duties for all known validators + // and can check when a validator needs to perform a task. + let duties_manager = Arc::new(DutiesManager { + duties_map, + // these are abstract objects capable of signing + signers: keypairs, + beacon_node: validator_client, + }); + + let spec = Arc::new(config.spec); + + Ok(Service { fork, slot_clock, current_slot, - slots_per_epoch: config.spec.slots_per_epoch, + spec, + duties_manager, beacon_block_client, - validator_client, - attester_client, + attestation_client, log, }) } /// Initialise the service then run the core thread. + // TODO: Improve handling of generic BeaconNode types, to stub grpcClient pub fn start(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result<()> { // connect to the node and retrieve its properties and initialize the gRPC clients - let service = Service::initialize_service(&config, log)?; + let mut service = + Service::::initialize_service(config, log)?; // we have connected to a node and established its parameters. Spin up the core service @@ -185,144 +225,128 @@ impl Service { // set up the validator work interval - start at next slot and proceed every slot let interval = { // Set the interval to start at the next slot, and every slot after - let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); + let slot_duration = Duration::from_secs(service.spec.seconds_per_slot); //TODO: Handle checked add correctly Interval::new(Instant::now() + duration_to_next_slot, slot_duration) }; - /* kick off core service */ - - // generate keypairs - - // TODO: keypairs are randomly generated; they should be loaded from a file or generated. - // https://github.com/sigp/lighthouse/issues/160 - let keypairs = Arc::new(generate_deterministic_keypairs(8)); - - /* build requisite objects to pass to core thread */ - - // Builds a mapping of Epoch -> Map(PublicKey, EpochDuty) - // where EpochDuty contains slot numbers and attestation data that each validator needs to - // produce work on. - let duties_map = RwLock::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); - - // builds a manager which maintains the list of current duties for all known validators - // and can check when a validator needs to perform a task. - let manager = Arc::new(DutiesManager { - duties_map, - pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(), - beacon_node: service.validator_client.clone(), - }); - - // run the core thread + /* kick off the core service */ runtime.block_on( interval .for_each(move |_| { - let log = service.log.clone(); - - /* get the current slot and epoch */ - let current_slot = match service.slot_clock.present_slot() { - Err(e) => { - error!(log, "SystemTimeError {:?}", e); - return Ok(()); - } - Ok(slot) => slot.expect("Genesis is in the future"), - }; - - let current_epoch = current_slot.epoch(service.slots_per_epoch); - - debug_assert!( - current_slot > service.current_slot, - "The Timer should poll a new slot" - ); - - info!(log, "Processing slot: {}", current_slot.as_u64()); - - /* check for new duties */ - - let cloned_manager = manager.clone(); - let cloned_log = log.clone(); - // spawn a new thread separate to the runtime - std::thread::spawn(move || { - cloned_manager.run_update(current_epoch.clone(), cloned_log.clone()); - dbg!("Finished thread"); - }); - - /* execute any specified duties */ - - if let Some(work) = manager.get_current_work(current_slot) { - for (_public_key, work_type) in work { - if work_type.produce_block { - // TODO: Produce a beacon block in a new thread - } - if work_type.attestation_duty.is_some() { - // available AttestationDuty info - let attestation_duty = - work_type.attestation_duty.expect("Cannot be None"); - //TODO: Produce an attestation in a new thread - } - } - } - + // wait for node to process + std::thread::sleep(TIME_DELAY_FROM_SLOT); + // if a non-fatal error occurs, proceed to the next slot. + let _ignore_error = service.per_slot_execution(); + // completed a slot process Ok(()) }) .map_err(|e| format!("Service thread failed: {:?}", e)), - ); - - // completed a slot process + )?; + // validator client exited Ok(()) } - /* + /// The execution logic that runs every slot. + // Errors are logged to output, and core execution continues unless fatal errors occur. + fn per_slot_execution(&mut self) -> error_chain::Result<()> { + /* get the new current slot and epoch */ + self.update_current_slot()?; - // Spawn a new thread to perform block production for the validator. - let producer_thread = { - let spec = spec.clone(); - let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone())); - let duties_map = duties_map.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); - thread::spawn(move || { - let block_producer = - BlockProducer::new(spec, duties_map, slot_clock, client, signer); - let mut block_producer_service = BlockProducerService { - block_producer, - poll_interval_millis, - log, - }; + /* check for new duties */ + self.check_for_duties(); - block_producer_service.run(); - }) - }; + /* process any required duties for validators */ + self.process_duties(); - // Spawn a new thread for attestation for the validator. - let attester_thread = { - let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); - let epoch_map = epoch_map_for_attester.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); - thread::spawn(move || { - let attester = Attester::new(epoch_map, slot_clock, client, signer); - let mut attester_service = AttesterService { - attester, - poll_interval_millis, - log, - }; - - attester_service.run(); - }) - }; - - threads.push((duties_manager_thread, producer_thread, attester_thread)); + Ok(()) } - // Naively wait for all the threads to complete. - for tuple in threads { - let (manager, producer, attester) = tuple; - let _ = producer.join(); - let _ = manager.join(); - let _ = attester.join(); + /// Updates the known current slot and epoch. + fn update_current_slot(&mut self) -> error_chain::Result<()> { + let current_slot = match self.slot_clock.present_slot() { + Err(e) => { + error!(self.log, "SystemTimeError {:?}", e); + return Err("Could not read system time".into()); + } + Ok(slot) => slot.expect("Genesis is in the future"), + }; + + let current_epoch = current_slot.epoch(self.spec.slots_per_epoch); + + // this is a fatal error. If the slot clock repeats, there is something wrong with + // the timer, terminate immediately. + assert!( + current_slot > self.current_slot, + "The Timer should poll a new slot" + ); + self.current_slot = current_slot; + info!(self.log, "Processing"; "slot" => current_slot.as_u64(), "epoch" => current_epoch.as_u64()); + Ok(()) + } + + /// For all known validator keypairs, update any known duties from the beacon node. + fn check_for_duties(&mut self) { + let cloned_manager = self.duties_manager.clone(); + let cloned_log = self.log.clone(); + let current_epoch = self.current_slot.epoch(self.spec.slots_per_epoch); + // spawn a new thread separate to the runtime + // TODO: Handle thread termination/timeout + std::thread::spawn(move || { + // the return value is a future which returns ready. + // built to be compatible with the tokio runtime. + let _empty = cloned_manager.run_update(current_epoch, cloned_log.clone()); + }); + } + + /// If there are any duties to process, spawn a separate thread and perform required actions. + fn process_duties(&mut self) { + if let Some(work) = self.duties_manager.get_current_work(self.current_slot) { + for (signer_index, work_type) in work { + if work_type.produce_block { + // we need to produce a block + // spawns a thread to produce a beacon block + let signers = self.duties_manager.signers.clone(); // this is an arc + let fork = self.fork.clone(); + let slot = self.current_slot; + let spec = self.spec.clone(); + let beacon_node = self.beacon_block_client.clone(); + let log = self.log.clone(); + std::thread::spawn(move || { + info!(log, "Producing a block"; "Validator"=> format!("{}", signers[signer_index])); + let signer = &signers[signer_index]; + let mut block_producer = BlockProducer { + fork, + slot, + spec, + beacon_node, + signer, + }; + block_producer.handle_produce_block(log); + }); + } + if work_type.attestation_duty.is_some() { + // we need to produce an attestation + // spawns a thread to produce and sign an attestation + let signers = self.duties_manager.signers.clone(); // this is an arc + let fork = self.fork.clone(); + let spec = self.spec.clone(); + let beacon_node = self.attestation_client.clone(); + let log = self.log.clone(); + std::thread::spawn(move || { + info!(log, "Producing an attestation"; "Validator"=> format!("{}", signers[signer_index])); + let signer = &signers[signer_index]; + let mut attestation_producer = AttestationProducer { + fork, + duty: work_type.attestation_duty.expect("Should never be none"), + spec, + beacon_node, + signer, + }; + attestation_producer.handle_produce_attestation(log); + }); + } + } + } } - */ } diff --git a/validator_client/src/signer.rs b/validator_client/src/signer.rs new file mode 100644 index 000000000..018142322 --- /dev/null +++ b/validator_client/src/signer.rs @@ -0,0 +1,21 @@ +use std::fmt::Display; +use types::{Keypair, PublicKey, Signature}; + +/// Signs message using an internally-maintained private key. +pub trait Signer: Display + Send + Sync + Clone { + fn sign_message(&self, message: &[u8], domain: u64) -> Option; + /// Returns a public key for the signer object. + fn to_public(&self) -> PublicKey; +} + +/* Implements Display and Signer for Keypair */ + +impl Signer for Keypair { + fn to_public(&self) -> PublicKey { + self.pk.clone() + } + + fn sign_message(&self, message: &[u8], domain: u64) -> Option { + Some(Signature::new(message, domain, &self.sk)) + } +}