Correct validator get duties RPC server logic

This commit is contained in:
Age Manning 2019-03-28 13:14:41 +11:00
parent d3af95d1eb
commit 4caaf82892
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
2 changed files with 80 additions and 82 deletions

View File

@ -4,9 +4,10 @@ use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use protos::services::{ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty}; use protos::services::{ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty};
use protos::services_grpc::ValidatorService; use protos::services_grpc::ValidatorService;
use slog::{debug, Logger}; use slog::{debug, warn, Logger};
use ssz::Decodable; use ssz::Decodable;
use std::sync::Arc; use std::sync::Arc;
use types::{Epoch, RelativeEpoch};
#[derive(Clone)] #[derive(Clone)]
pub struct ValidatorServiceInstance { pub struct ValidatorServiceInstance {
@ -28,23 +29,47 @@ impl ValidatorService for ValidatorServiceInstance {
let validators = req.get_validators(); let validators = req.get_validators();
debug!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); debug!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch());
let epoch = req.get_epoch(); let epoch = Epoch::from(req.get_epoch());
let mut resp = GetDutiesResponse::new(); let mut resp = GetDutiesResponse::new();
let resp_validators = resp.mut_active_validators(); let resp_validators = resp.mut_active_validators();
let spec = self.chain.get_spec(); let spec = self.chain.get_spec();
let state = self.chain.get_state(); let state = self.chain.get_state();
//TODO: Decide whether to rebuild the cache let relative_epoch =
//TODO: Get the active validator indicies match RelativeEpoch::from_epoch(state.slot.epoch(spec.slots_per_epoch), epoch) {
//let active_validator_indices = self.chain.state.read().get_cached_active_validator_indices( Ok(v) => v,
let active_validator_indices = vec![1, 2, 3, 4, 5, 6, 7, 8]; Err(e) => {
// TODO: Is this the most efficient? Perhaps we cache this data structure. // incorrect epoch
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::FailedPrecondition,
Some(format!("Invalid epoch: {:?}", e)),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
// this is an array of validators who are to propose this epoch let validator_proposers: Result<Vec<usize>, _> = epoch
// TODO: RelativeEpoch? .slot_iter(spec.slots_per_epoch)
//let validator_proposers = [0..spec.slots_per_epoch].iter().map(|slot| state.get_beacon_proposer_index(Slot::from(slot), epoch, &spec)).collect(); .map(|slot| state.get_beacon_proposer_index(slot, relative_epoch, &spec))
let validator_proposers: Vec<u64> = vec![1, 2, 3, 4, 5]; .collect();
let validator_proposers = match validator_proposers {
Ok(v) => v,
Err(_) => {
// could not get the validator proposer index
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some("Invalid public_key".to_string()),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?} : {:?}", req, e));
return ctx.spawn(f);
}
};
// get the duties for each validator // get the duties for each validator
for validator_pk in validators.get_public_keys() { for validator_pk in validators.get_public_keys() {
@ -53,45 +78,65 @@ impl ValidatorService for ValidatorServiceInstance {
let public_key = match PublicKey::ssz_decode(validator_pk, 0) { let public_key = match PublicKey::ssz_decode(validator_pk, 0) {
Ok((v, _index)) => v, Ok((v, _index)) => v,
Err(_) => { Err(_) => {
let log_clone = self.log.clone();
let f = sink let f = sink
.fail(RpcStatus::new( .fail(RpcStatus::new(
RpcStatusCode::InvalidArgument, RpcStatusCode::InvalidArgument,
Some("Invalid public_key".to_string()), Some("Invalid public_key".to_string()),
)) ))
//TODO: Handle error correctly .map_err(move |e| warn!(log_clone, "failed to reply {:?}", req));
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f); return ctx.spawn(f);
} }
}; };
// is the validator active // get the validator index
let val_index = match state.get_validator_index(&public_key) { let val_index = match state.get_validator_index(&public_key) {
Ok(Some(index)) => { Ok(Some(index)) => index,
if active_validator_indices.contains(&index) { Ok(None) => {
// validator is active, return the index // index not present in registry, set the duties for this key to None
index warn!(
} else { self.log,
// validator is inactive, go to the next validator "RPC requested a public key that is not in the registry: {:?}", public_key
active_validator.set_none(false); );
resp_validators.push(active_validator);
break;
}
}
// validator index is not known, skip it
Ok(_) => {
active_validator.set_none(false); active_validator.set_none(false);
resp_validators.push(active_validator); resp_validators.push(active_validator);
break; break;
} }
// the cache is not built, throw an error // the cache is not built, throw an error
Err(_) => { Err(e) => {
let log_clone = self.log.clone();
let f = sink let f = sink
.fail(RpcStatus::new( .fail(RpcStatus::new(
RpcStatusCode::FailedPrecondition, RpcStatusCode::FailedPrecondition,
Some("Beacon state cache is not built".to_string()), Some(format!("Beacon state error {:?}", e)),
)) ))
//TODO: Handle error correctly .map_err(move |e| warn!(log_clone, "Failed to reply {:?}: {:?}", req, e));
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); return ctx.spawn(f);
}
};
// get attestation duties and check if validator is active
let attestation_duties = match state.get_attestation_duties(val_index, &spec) {
Ok(Some(v)) => v,
Ok(_) => {
// validator is inactive, go to the next validator
warn!(
self.log,
"RPC requested an inactive validator key: {:?}", public_key
);
active_validator.set_none(false);
resp_validators.push(active_validator);
break;
}
// the cache is not built, throw an error
Err(e) => {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::FailedPrecondition,
Some(format!("Beacon state error {:?}", e)),
))
.map_err(move |e| warn!(log_clone, "Failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f); return ctx.spawn(f);
} }
}; };
@ -100,33 +145,15 @@ impl ValidatorService for ValidatorServiceInstance {
let mut duty = ValidatorDuty::new(); let mut duty = ValidatorDuty::new();
// check if the validator needs to propose a block // check if the validator needs to propose a block
if let Some(slot) = validator_proposers if let Some(slot) = validator_proposers.iter().position(|&v| val_index == v) {
.iter() duty.set_block_production_slot(
.position(|&v| val_index as u64 == v) epoch.start_slot(spec.slots_per_epoch).as_u64() + slot as u64,
{ );
duty.set_block_production_slot(epoch * spec.slots_per_epoch + slot as u64);
} else { } else {
// no blocks to propose this epoch // no blocks to propose this epoch
duty.set_none(false) duty.set_none(false)
} }
// get attestation duties
let attestation_duties = match state.get_attestation_duties(val_index, &spec) {
Ok(Some(v)) => v,
Ok(_) => unreachable!(), //we've checked the validator index
// the cache is not built, throw an error
Err(_) => {
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::FailedPrecondition,
Some("Beacon state cache is not built".to_string()),
))
//TODO: Handle error correctly
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
duty.set_committee_index(attestation_duties.committee_index as u64); duty.set_committee_index(attestation_duties.committee_index as u64);
duty.set_attestation_slot(attestation_duties.slot.as_u64()); duty.set_attestation_slot(attestation_duties.slot.as_u64());
duty.set_attestation_shard(attestation_duties.shard); duty.set_attestation_shard(attestation_duties.shard);

View File

@ -291,35 +291,6 @@ impl Service {
/* /*
for keypair in keypairs {
info!(self.log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());
// Spawn a new thread to maintain the validator's `EpochDuties`.
let duties_manager_thread = {
let spec = spec.clone();
let duties_map = duties_map.clone();
let slot_clock = self.slot_clock.clone();
let log = self.log.clone();
let beacon_node = self.validator_client.clone();
let pubkey = keypair.pk.clone();
thread::spawn(move || {
let manager = DutiesManager {
duties_map,
pubkey,
spec,
slot_clock,
beacon_node,
};
let mut duties_manager_service = DutiesManagerService {
manager,
poll_interval_millis,
log,
};
duties_manager_service.run();
})
};
// Spawn a new thread to perform block production for the validator. // Spawn a new thread to perform block production for the validator.
let producer_thread = { let producer_thread = {
let spec = spec.clone(); let spec = spec.clone();