diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 4dfd33487..20cd62b1d 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -49,7 +49,10 @@ pub fn start_server( create_beacon_block_service(instance) }; let validator_service = { - let instance = ValidatorServiceInstance { log: log.clone() }; + let instance = ValidatorServiceInstance { + chain: beacon_chain.clone(), + log: log.clone(), + }; create_validator_service(instance) }; diff --git a/beacon_node/rpc/src/validator.rs b/beacon_node/rpc/src/validator.rs index f894deca6..8071fac5c 100644 --- a/beacon_node/rpc/src/validator.rs +++ b/beacon_node/rpc/src/validator.rs @@ -2,59 +2,132 @@ use bls::PublicKey; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use protos::services::{ - IndexResponse, ProposeBlockSlotRequest, ProposeBlockSlotResponse, PublicKey as PublicKeyRequest, -}; + GetDutiesRequest, GetDutiesResponse, Validators}; use protos::services_grpc::ValidatorService; use slog::{debug, Logger}; use ssz::Decodable; +use std::sync::Arc; +use crate::beacon_chain::BeaconChain; #[derive(Clone)] pub struct ValidatorServiceInstance { + pub chain: Arc, pub log: Logger, } +//TODO: Refactor Errors impl ValidatorService for ValidatorServiceInstance { - fn validator_index( + + /// For a list of validator public keys, this function returns the slot at which each + /// validator must propose a block, attest to a shard, their shard committee and the shard they + /// need to attest to. + fn get_validator_duties ( &mut self, ctx: RpcContext, - req: PublicKeyRequest, - sink: UnarySink, + req: GetDutiesRequest, + sink: UnarySink, ) { - if let Ok((public_key, _)) = PublicKey::ssz_decode(req.get_public_key(), 0) { - debug!(self.log, "RPC request"; "endpoint" => "ValidatorIndex", "public_key" => public_key.concatenated_hex_id()); + let validators = req.get_validators(); + debug!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); - let mut resp = IndexResponse::new(); + let epoch = req.get_epoch(); + let mut resp = GetDutiesResponse::new(); - // TODO: return a legit value. - resp.set_index(1); + let spec = self.chain.spec; + let state = self.chain.state.read(); - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } else { - let f = sink + //TODO: Decide whether to rebuild the cache + //TODO: Get the active validator indicies + //let active_validator_indices = self.chain.state.read().get_cached_active_validator_indices( + let active_validator_indices = &[1,2,3,4,5,6,7,8]; + // TODO: Is this the most efficient? Perhaps we cache this data structure. + + // this is an array of validators who are to propose this epoch + // TODO: RelativeEpoch? + let validator_proposers = 0..spec.slots_per_epoch.to_iter().map(|slot| state.get_beacon_proposer_index(slot, epoch, &spec)).collect(); + + // get the duties for each validator + for validator in validators { + let active_validator = ActiveValidator::new(); + + + let public_key = match PublicKey::ssz_decode(validator, 0) { + Ok((v_) => v, + Err(_) => { + let f = sink .fail(RpcStatus::new( RpcStatusCode::InvalidArgument, Some("Invalid public_key".to_string()), )) + //TODO: Handle error correctly .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } - } + return ctx.spawn(f); + } + }; - fn propose_block_slot( - &mut self, - ctx: RpcContext, - req: ProposeBlockSlotRequest, - sink: UnarySink, - ) { - debug!(self.log, "RPC request"; "endpoint" => "ProposeBlockSlot", "epoch" => req.get_epoch(), "validator_index" => req.get_validator_index()); + // is the validator active + let val_index = match state.get_validator_index(&public_key) { + Ok(index) => { + if active_validator_indices.contains(index) { + // validator is active, return the index + index + } + else { + // validator is inactive, go to the next validator + active_validator.set_none(); + resp.push(active_validator); + break; + } + }, + // the cache is not built, throw an error + Err(_) =>{ + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::FailedPreCondition, + Some("Beacon state cache is not built".to_string()), + )) + //TODO: Handle error correctly + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; - let mut resp = ProposeBlockSlotResponse::new(); + // we have an active validator, set its duties + let duty = ValidatorDuty::new(); + + // check if it needs to propose a block + let Some(slot) = validator_proposers.iter().position(|&v| val_index ==v) { + duty.set_block_production_slot(slot); + } + else { + // no blocks to propose this epoch + duty.set_none() + } + + // get attestation duties + let attestation_duties = match state.get_attestation_duties(val_index, &spec) { + Ok(v) => v, + // the cache is not built, throw an error + Err(_) =>{ + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::FailedPreCondition, + Some("Beacon state cache is not built".to_string()), + )) + //TODO: Handle error correctly + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; + + duty.set_committee_index(attestation_duties.committee_index); + duty.set_attestation_slot(attestation_duties.slot); + duty.set_attestation_shard(attestation_duties.shard); + + active_validator.set_duty(duty); + resp.push(active_validator); + } - // TODO: return a legit value. - resp.set_slot(1); let f = sink .success(resp) diff --git a/protos/src/services.proto b/protos/src/services.proto index 7a4bbc977..82a2703e6 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -122,8 +122,8 @@ message ValidatorDuty { bool none = 1; uint64 block_production_slot = 2; } - uint64 committee_slot = 3; - uint64 committee_shard = 4; + uint64 attestation_slot = 3; + uint64 attestation_shard = 4; uint64 committee_index = 5; }