diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 27398b6c9..a9a28ea39 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -479,7 +479,7 @@ where } /// Produce an `AttestationData` that is valid for the present `slot` and given `shard`. - pub fn produce_attestation(&self, shard: u64) -> Result { + pub fn produce_attestation_data(&self, shard: u64) -> Result { trace!("BeaconChain::produce_attestation: shard: {}", shard); let source_epoch = self.state.read().current_justified_epoch; let source_root = *self.state.read().get_block_root( diff --git a/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs b/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs index fde8211ab..2d2b9e84d 100644 --- a/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs +++ b/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs @@ -50,12 +50,12 @@ impl DirectBeaconNode { } impl AttesterBeaconNode for DirectBeaconNode { - fn produce_attestation( + fn produce_attestation_data( &self, _slot: Slot, shard: u64, ) -> Result, NodeError> { - match self.beacon_chain.produce_attestation(shard) { + match self.beacon_chain.produce_attestation_data(shard) { Ok(attestation_data) => Ok(Some(attestation_data)), Err(e) => Err(NodeError::RemoteFailure(format!("{:?}", e))), } diff --git a/beacon_node/rpc/src/beacon_attester.rs b/beacon_node/rpc/src/beacon_attester.rs index 36b6a40b2..4166c30d4 100644 --- a/beacon_node/rpc/src/beacon_attester.rs +++ b/beacon_node/rpc/src/beacon_attester.rs @@ -1,45 +1,63 @@ +use crate::beacon_chain::BeaconChain; use futures::Future; -use grpcio::{RpcContext, UnarySink}; +use grpcio::{RpcContext, UnarySink, RpcStatus, RpcStatusCode}; use protos::services::{ - Attestation as AttestationProto, ProduceAttestation, ProduceAttestationResponse, - ProduceAttestationRequest, PublishAttestationResponse, PublishAttestationRequest, + AttestationData as AttestationDataProto, ProduceAttestationData, ProduceAttestationDataResponse, + ProduceAttestationDataRequest, PublishAttestationResponse, PublishAttestationRequest, PublishAttestation }; use protos::services_grpc::BeaconBlockService; -use slog::Logger; +use slog::{Logger, info, warn, error}; + +const TEST_SHARD_PHASE_ZERO: u8 = 0; #[derive(Clone)] pub struct AttestationServiceInstance { + pub chain: Arc, pub log: Logger, } impl AttestationService for AttestationServiceInstance { - /// Produce a `BeaconBlock` for signing by a validator. - fn produce_attestation( + /// Produce the `AttestationData` for signing by a validator. + fn produce_attestation_data( &mut self, ctx: RpcContext, - req: ProduceAttestationRequest, - sink: UnarySink, + req: ProduceAttestationDataRequest, + sink: UnarySink, ) { - println!("producing attestation at slot {}", req.get_slot()); + info!(&self.log, "Attempting to produce attestation at slot {}", req.get_slot()); - // TODO: build a legit block. - let mut attestation = AttestationProto::new(); - attestation.set_slot(req.get_slot()); - // TODO Set the shard to something legit. - attestation.set_shard(0); - attestation.set_block_root(b"cats".to_vec()); + // get the chain spec & state + let spec = self.chain.get_spec(); + let state = self.chain.get_state(); - let mut resp = ProduceAttestationResponse::new(); - resp.set_attestation_data(attestation); + let slot_requested = req.get_slot(); + + // Start by performing some checks + // Check that the the AttestionData is for the current slot (otherwise it will not be valid) + if slot_requested != state.slot { + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::OutOfRange, + "AttestationData request for a slot that is not the current slot." + )) + .map_err(move |e| error!(&self.log, "Failed to reply with failure {:?}: {:?}", req, e)); + } + + // Then get the AttestationData from the beacon chain (for shard 0 for now) + let attestation_data = self.chain.produce_attestation_data(TEST_SHARD_PHASE_ZERO); + + let mut resp = ProduceAttestationDataResponse::new(); + resp.set_attestation_data(attestation_data); let f = sink .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + .map_err(move |e| error!("Failed to reply with success {:?}: {:?}", req, e)); ctx.spawn(f) } - /// Accept some fully-formed `BeaconBlock`, process and publish it. + /// Accept some fully-formed `FreeAttestation` from the validator, + /// store it, and aggregate it into an `Attestation`. fn publish_attestation( &mut self, ctx: RpcContext, diff --git a/eth2/attester/src/lib.rs b/eth2/attester/src/lib.rs index 270c1e4d7..a4295f005 100644 --- a/eth2/attester/src/lib.rs +++ b/eth2/attester/src/lib.rs @@ -94,7 +94,7 @@ impl Attester Result { - let attestation_data = match self.beacon_node.produce_attestation(slot, shard)? { + let attestation_data = match self.beacon_node.produce_attestation_data(slot, shard)? { Some(attestation_data) => attestation_data, None => return Ok(PollOutcome::BeaconNodeUnableToProduceAttestation(slot)), }; diff --git a/eth2/attester/src/test_utils/simulated_beacon_node.rs b/eth2/attester/src/test_utils/simulated_beacon_node.rs index 84a203cdb..d19f43422 100644 --- a/eth2/attester/src/test_utils/simulated_beacon_node.rs +++ b/eth2/attester/src/test_utils/simulated_beacon_node.rs @@ -26,7 +26,7 @@ impl SimulatedBeaconNode { } impl BeaconNode for SimulatedBeaconNode { - fn produce_attestation(&self, slot: Slot, shard: u64) -> ProduceResult { + fn produce_attestation_data(&self, slot: Slot, shard: u64) -> ProduceResult { *self.produce_input.write().unwrap() = Some((slot, shard)); match *self.produce_result.read().unwrap() { Some(ref r) => r.clone(), diff --git a/eth2/attester/src/traits.rs b/eth2/attester/src/traits.rs index 749c6e1a2..2fd6940af 100644 --- a/eth2/attester/src/traits.rs +++ b/eth2/attester/src/traits.rs @@ -14,7 +14,7 @@ pub enum PublishOutcome { /// Defines the methods required to produce and publish blocks on a Beacon Node. pub trait BeaconNode: Send + Sync { - fn produce_attestation( + fn produce_attestation_data( &self, slot: Slot, shard: u64, diff --git a/eth2/utils/bls/src/keypair.rs b/eth2/utils/bls/src/keypair.rs index 6feb2a585..c91b13bad 100644 --- a/eth2/utils/bls/src/keypair.rs +++ b/eth2/utils/bls/src/keypair.rs @@ -1,5 +1,6 @@ use super::{PublicKey, SecretKey}; use serde_derive::{Deserialize, Serialize}; +use std::hash::{Hash, Hasher}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Keypair { @@ -19,3 +20,15 @@ impl Keypair { self.pk.concatenated_hex_id() } } + +impl Hash for Keypair { + /// Note: this is distinct from consensus serialization, it will produce a different hash. + /// + /// This method uses the uncompressed bytes, which are much faster to obtain than the + /// compressed bytes required for consensus serialization. + /// + /// Use `ssz::Encode` to obtain the bytes required for consensus hashing. + fn hash(&self, state: &mut H) { + self.pk.as_uncompressed_bytes().hash(state) + } +} diff --git a/protos/src/services.proto b/protos/src/services.proto index e5095f386..7cd653a60 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -35,7 +35,7 @@ service ValidatorService { /// Service that handles validator attestations service AttestationService { - rpc ProduceAttestation(ProduceAttestationRequest) returns (ProduceAttestationResponse); + rpc ProduceAttestationData(ProduceAttestationDataRequest) returns (ProduceAttestationDataResponse); rpc PublishAttestation(PublishAttestationRequest) returns (PublishAttestationResponse); } @@ -131,13 +131,13 @@ message ValidatorDuty { * Attestation Service Messages */ -message ProduceAttestationRequest { +message ProduceAttestationDataRequest { uint64 slot = 1; uint64 shard = 2; } -message ProduceAttestationResponse { - Attestation attestation_data = 1; +message ProduceAttestationDataResponse { + AttestationData attestation_data = 1; } message PublishAttestationRequest { @@ -155,7 +155,7 @@ message Crosslink { } -message Attestation { +message AttestationData { uint64 slot = 1; uint64 shard = 2; bytes beacon_block_root = 3; @@ -168,7 +168,7 @@ message Attestation { } message FreeAttestation { - Attestation attestation_data = 1; + AttestationData data = 1; bytes signature = 2; uint64 validator_index = 3; } diff --git a/validator_client/src/attester_service/attestation_grpc_client.rs b/validator_client/src/attester_service/attestation_grpc_client.rs index 5a4701ba9..502e51cac 100644 --- a/validator_client/src/attester_service/attestation_grpc_client.rs +++ b/validator_client/src/attester_service/attestation_grpc_client.rs @@ -2,8 +2,8 @@ use protos::services_grpc::AttestationServiceClient; use std::sync::Arc; use attester::{BeaconNode, BeaconNodeError, PublishOutcome}; -use protos::services::ProduceAttestationRequest; -use types::{AttestationData, FreeAttestation, Slot}; +use protos::services::ProduceAttestationDataRequest; +use types::{Attestation, AttestationData, Slot}; pub struct AttestationGrpcClient { client: Arc, @@ -14,20 +14,20 @@ impl AttestationGrpcClient { Self { client } } } - +/* impl BeaconNode for AttestationGrpcClient { - fn produce_attestation( + fn produce_attestation_data( &self, slot: Slot, shard: u64, ) -> Result, BeaconNodeError> { - let mut req = ProduceAttestationRequest::new(); + let mut req = ProduceAttestationDataRequest::new(); req.set_slot(slot.as_u64()); req.set_shard(shard); let reply = self .client - .produce_attestation(&req) + .produce_attestation_data(&req) .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; // TODO: return correct Attestation @@ -36,9 +36,10 @@ impl BeaconNode for AttestationGrpcClient { fn publish_attestation( &self, - free_attestation: FreeAttestation, + attestation: Attestation, ) -> Result { // TODO: return correct PublishOutcome Err(BeaconNodeError::DecodeFailure) } } +*/ diff --git a/validator_client/src/attester_service/mod.rs b/validator_client/src/attester_service/mod.rs index fe5de7647..c14669445 100644 --- a/validator_client/src/attester_service/mod.rs +++ b/validator_client/src/attester_service/mod.rs @@ -6,18 +6,22 @@ use std::time::Duration; pub use self::attestation_grpc_client::AttestationGrpcClient; -pub struct AttesterService { - pub attester: Attester, +pub struct AttesterService {} +/* +pub struct AttesterService { + // pub attester: Attester, pub poll_interval_millis: u64, pub log: Logger, } -impl AttesterService { + +impl AttesterService { /// Run a loop which polls the Attester each `poll_interval_millis` millseconds. /// /// Logs the results of the polls. pub fn run(&mut self) { loop { + /* We don't do the polling any more... match self.attester.poll() { Err(error) => { error!(self.log, "Attester poll error"; "error" => format!("{:?}", error)) @@ -47,8 +51,10 @@ impl AttesterService slot) } }; - - std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); - } - } + */ +println!("Legacy polling still happening..."); +std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); } +} +} +*/ diff --git a/validator_client/src/duties/beacon_node_duties.rs b/validator_client/src/duties/beacon_node_duties.rs index efe9e836d..b66b5f704 100644 --- a/validator_client/src/duties/beacon_node_duties.rs +++ b/validator_client/src/duties/beacon_node_duties.rs @@ -1,5 +1,5 @@ use super::EpochDuties; -use types::{Epoch, PublicKey}; +use types::{Epoch, Keypair}; #[derive(Debug, PartialEq, Clone)] pub enum BeaconNodeDutiesError { @@ -15,6 +15,6 @@ pub trait BeaconNodeDuties: Send + Sync { fn request_duties( &self, epoch: Epoch, - pubkeys: &[PublicKey], + signers: &[Keypair], ) -> Result; } diff --git a/validator_client/src/duties/epoch_duties.rs b/validator_client/src/duties/epoch_duties.rs index f17737789..8e710ba9a 100644 --- a/validator_client/src/duties/epoch_duties.rs +++ b/validator_client/src/duties/epoch_duties.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::fmt; use std::ops::{Deref, DerefMut}; -use types::{AttestationDuty, Epoch, PublicKey, Slot}; +use types::{AttestationDuty, Epoch, Keypair, Slot}; /// When work needs to be performed by a validator, this type is given back to the main service /// which indicates all the information that required to process the work. @@ -71,8 +71,8 @@ impl fmt::Display for EpochDuty { } } -/// Maps a list of public keys (many validators) to an EpochDuty. -pub type EpochDuties = HashMap>; +/// Maps a list of keypairs (many validators) to an EpochDuty. +pub type EpochDuties = HashMap>; pub enum EpochDutiesMapError { UnknownEpoch, @@ -113,7 +113,7 @@ impl EpochDutiesMap { pub fn is_work_slot( &self, slot: Slot, - pubkey: &PublicKey, + signer: &Keypair, ) -> Result, EpochDutiesMapError> { let epoch = slot.epoch(self.slots_per_epoch); @@ -121,7 +121,7 @@ impl EpochDutiesMap { .map .get(&epoch) .ok_or_else(|| EpochDutiesMapError::UnknownEpoch)?; - if let Some(epoch_duty) = epoch_duties.get(pubkey) { + if let Some(epoch_duty) = epoch_duties.get(signer) { if let Some(duty) = epoch_duty { // Retrieves the duty for a validator at a given slot return Ok(duty.is_work_slot(slot)); diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs index f05307141..d6e2e5238 100644 --- a/validator_client/src/duties/grpc.rs +++ b/validator_client/src/duties/grpc.rs @@ -6,21 +6,21 @@ use protos::services_grpc::ValidatorServiceClient; use ssz::ssz_encode; use std::collections::HashMap; use std::time::Duration; -use types::{Epoch, PublicKey, Slot}; +use types::{Epoch, Keypair, Slot}; impl BeaconNodeDuties for ValidatorServiceClient { /// Requests all duties (block signing and committee attesting) from the Beacon Node (BN). fn request_duties( &self, epoch: Epoch, - pubkeys: &[PublicKey], + signers: &[Keypair], ) -> Result { // Get the required duties from all validators // build the request let mut req = GetDutiesRequest::new(); req.set_epoch(epoch.as_u64()); let mut validators = Validators::new(); - validators.set_public_keys(pubkeys.iter().map(|v| ssz_encode(v)).collect()); + validators.set_public_keys(signers.iter().map(|v| ssz_encode(&v.pk)).collect()); req.set_validators(validators); // set a timeout for requests @@ -31,11 +31,11 @@ impl BeaconNodeDuties for ValidatorServiceClient { .get_validator_duties(&req) .map_err(|err| BeaconNodeDutiesError::RemoteFailure(format!("{:?}", err)))?; - let mut epoch_duties: HashMap> = HashMap::new(); + let mut epoch_duties: HashMap> = HashMap::new(); for (index, validator_duty) in reply.get_active_validators().iter().enumerate() { if !validator_duty.has_duty() { // validator is inactive - epoch_duties.insert(pubkeys[index].clone(), None); + epoch_duties.insert(signers[index].clone(), None); continue; } // active validator @@ -53,7 +53,7 @@ impl BeaconNodeDuties for ValidatorServiceClient { attestation_shard: active_duty.get_attestation_shard(), committee_index: active_duty.get_committee_index(), }; - epoch_duties.insert(pubkeys[index].clone(), Some(epoch_duty)); + epoch_duties.insert(signers[index].clone(), Some(epoch_duty)); } Ok(epoch_duties) } diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index c16fc81fc..1019f489d 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -12,7 +12,7 @@ use futures::Async; use slog::{debug, error, info}; use std::sync::Arc; use std::sync::RwLock; -use types::{Epoch, PublicKey, Slot}; +use types::{Epoch, Keypair, Slot}; #[derive(Debug, PartialEq, Clone)] pub enum UpdateOutcome { @@ -40,8 +40,9 @@ pub enum Error { /// This keeps track of all validator keys and required voting slots. pub struct DutiesManager { pub duties_map: RwLock, - /// A list of all public keys known to the validator service. - pub pubkeys: Vec, + /// A list of all signer objects known to the validator service. + // TODO: Generalise the signers, so that they're not just keypairs + pub signers: Arc>, pub beacon_node: Arc, } @@ -50,7 +51,7 @@ impl DutiesManager { /// /// be a wall-clock (e.g., system time, remote server time, etc.). fn update(&self, epoch: Epoch) -> Result { - let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?; + let duties = self.beacon_node.request_duties(epoch, &self.signers)?; { // If these duties were known, check to see if they're updates or identical. if let Some(known_duties) = self.duties_map.read()?.get(&epoch) { @@ -91,17 +92,18 @@ impl DutiesManager { /// Returns a list of (Public, WorkInfo) indicating all the validators that have work to perform /// this slot. - pub fn get_current_work(&self, slot: Slot) -> Option> { - let mut current_work: Vec<(PublicKey, WorkInfo)> = Vec::new(); + pub fn get_current_work(&self, slot: Slot) -> Option> { + let mut current_work: Vec<(Keypair, WorkInfo)> = Vec::new(); // if the map is poisoned, return None let duties = self.duties_map.read().ok()?; - for validator_pk in &self.pubkeys { - match duties.is_work_slot(slot, &validator_pk) { - Ok(Some(work_type)) => current_work.push((validator_pk.clone(), work_type)), + for validator_signer in self.signers.iter() { + match duties.is_work_slot(slot, &validator_signer) { + Ok(Some(work_type)) => current_work.push((validator_signer.clone(), work_type)), Ok(None) => {} // No work for this validator - Err(_) => {} // Unknown epoch or validator, no work + //TODO: This should really log an error, as we shouldn't end up with an err here. + Err(_) => {} // Unknown epoch or validator, no work } } if current_work.is_empty() { @@ -136,9 +138,9 @@ impl From for Error { fn print_duties(log: &slog::Logger, duties: EpochDuties) { for (pk, duty) in duties.iter() { if let Some(display_duty) = duty { - info!(log, "Validator: {}",pk; "Duty" => format!("{}",display_duty)); + info!(log, "Validator: {:?}",pk; "Duty" => format!("{}",display_duty)); } else { - info!(log, "Validator: {}",pk; "Duty" => "None"); + info!(log, "Validator: {:?}",pk; "Duty" => "None"); } } } diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index bd1053a34..d92d944cb 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -27,6 +27,7 @@ use slog::{debug, error, info, warn}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::sync::Arc; use std::sync::RwLock; +use std::thread; use std::time::{Duration, Instant, SystemTime}; use tokio::prelude::*; use tokio::runtime::Builder; @@ -37,6 +38,8 @@ use types::{ChainSpec, Epoch, Fork, Slot}; //TODO: This service should be simplified in the future. Can be made more steamlined. +const POLL_INTERVAL_MILLIS: u64 = 100; + /// The validator service. This is the main thread that executes and maintains validator /// duties. //TODO: Generalize the BeaconNode types to use testing @@ -180,7 +183,7 @@ impl Service { // and can check when a validator needs to perform a task. let duties_manager = Arc::new(DutiesManager { duties_map, - pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(), + signers: keypairs, beacon_node: validator_client, }); @@ -314,8 +317,21 @@ impl Service { } if work_type.attestation_duty.is_some() { // available AttestationDuty info - let attestation_duty = work_type.attestation_duty.expect("Cannot be None"); - //TODO: Produce an attestation in a new thread + /* + let attestation_duty = + work_type.attestation_duty.expect("Cannot be None"); + let attester_grpc_client = Arc::new(AttestationGrpcClient::new( + service.attester_client.clone(), + )); + let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); + let attester = Attester::new(attester_grpc_client, signer); + let mut attester_service = AttesterService { + attester, + poll_interval_millis: POLL_INTERVAL_MILLIS, + log: log.clone(), + }; + attester_service.run(); + */ } } }