Merges in validator client branch

This commit is contained in:
Age Manning 2019-03-30 13:17:24 +11:00
commit ba771282fa
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
15 changed files with 129 additions and 73 deletions

View File

@ -479,7 +479,7 @@ where
}
/// Produce an `AttestationData` that is valid for the present `slot` and given `shard`.
pub fn produce_attestation(&self, shard: u64) -> Result<AttestationData, Error> {
pub fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, Error> {
trace!("BeaconChain::produce_attestation: shard: {}", shard);
let source_epoch = self.state.read().current_justified_epoch;
let source_root = *self.state.read().get_block_root(

View File

@ -50,12 +50,12 @@ impl<T: ClientDB, U: SlotClock, F: ForkChoice> DirectBeaconNode<T, U, F> {
}
impl<T: ClientDB, U: SlotClock, F: ForkChoice> AttesterBeaconNode for DirectBeaconNode<T, U, F> {
fn produce_attestation(
fn produce_attestation_data(
&self,
_slot: Slot,
shard: u64,
) -> Result<Option<AttestationData>, NodeError> {
match self.beacon_chain.produce_attestation(shard) {
match self.beacon_chain.produce_attestation_data(shard) {
Ok(attestation_data) => Ok(Some(attestation_data)),
Err(e) => Err(NodeError::RemoteFailure(format!("{:?}", e))),
}

View File

@ -1,45 +1,63 @@
use crate::beacon_chain::BeaconChain;
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use grpcio::{RpcContext, UnarySink, RpcStatus, RpcStatusCode};
use protos::services::{
Attestation as AttestationProto, ProduceAttestation, ProduceAttestationResponse,
ProduceAttestationRequest, PublishAttestationResponse, PublishAttestationRequest,
AttestationData as AttestationDataProto, ProduceAttestationData, ProduceAttestationDataResponse,
ProduceAttestationDataRequest, PublishAttestationResponse, PublishAttestationRequest,
PublishAttestation
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
use slog::{Logger, info, warn, error};
const TEST_SHARD_PHASE_ZERO: u8 = 0;
#[derive(Clone)]
pub struct AttestationServiceInstance {
pub chain: Arc<BeaconChain>,
pub log: Logger,
}
impl AttestationService for AttestationServiceInstance {
/// Produce a `BeaconBlock` for signing by a validator.
fn produce_attestation(
/// Produce the `AttestationData` for signing by a validator.
fn produce_attestation_data(
&mut self,
ctx: RpcContext,
req: ProduceAttestationRequest,
sink: UnarySink<ProduceAttestationResponse>,
req: ProduceAttestationDataRequest,
sink: UnarySink<ProduceAttestationDataResponse>,
) {
println!("producing attestation at slot {}", req.get_slot());
info!(&self.log, "Attempting to produce attestation at slot {}", req.get_slot());
// TODO: build a legit block.
let mut attestation = AttestationProto::new();
attestation.set_slot(req.get_slot());
// TODO Set the shard to something legit.
attestation.set_shard(0);
attestation.set_block_root(b"cats".to_vec());
// get the chain spec & state
let spec = self.chain.get_spec();
let state = self.chain.get_state();
let mut resp = ProduceAttestationResponse::new();
resp.set_attestation_data(attestation);
let slot_requested = req.get_slot();
// Start by performing some checks
// Check that the the AttestionData is for the current slot (otherwise it will not be valid)
if slot_requested != state.slot {
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::OutOfRange,
"AttestationData request for a slot that is not the current slot."
))
.map_err(move |e| error!(&self.log, "Failed to reply with failure {:?}: {:?}", req, e));
}
// Then get the AttestationData from the beacon chain (for shard 0 for now)
let attestation_data = self.chain.produce_attestation_data(TEST_SHARD_PHASE_ZERO);
let mut resp = ProduceAttestationDataResponse::new();
resp.set_attestation_data(attestation_data);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
.map_err(move |e| error!("Failed to reply with success {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `BeaconBlock`, process and publish it.
/// Accept some fully-formed `FreeAttestation` from the validator,
/// store it, and aggregate it into an `Attestation`.
fn publish_attestation(
&mut self,
ctx: RpcContext,

View File

@ -94,7 +94,7 @@ impl<T: SlotClock, U: BeaconNode, V: DutiesReader, W: Signer> Attester<T, U, V,
}
fn produce_attestation(&mut self, slot: Slot, shard: u64) -> Result<PollOutcome, Error> {
let attestation_data = match self.beacon_node.produce_attestation(slot, shard)? {
let attestation_data = match self.beacon_node.produce_attestation_data(slot, shard)? {
Some(attestation_data) => attestation_data,
None => return Ok(PollOutcome::BeaconNodeUnableToProduceAttestation(slot)),
};

View File

@ -26,7 +26,7 @@ impl SimulatedBeaconNode {
}
impl BeaconNode for SimulatedBeaconNode {
fn produce_attestation(&self, slot: Slot, shard: u64) -> ProduceResult {
fn produce_attestation_data(&self, slot: Slot, shard: u64) -> ProduceResult {
*self.produce_input.write().unwrap() = Some((slot, shard));
match *self.produce_result.read().unwrap() {
Some(ref r) => r.clone(),

View File

@ -14,7 +14,7 @@ pub enum PublishOutcome {
/// Defines the methods required to produce and publish blocks on a Beacon Node.
pub trait BeaconNode: Send + Sync {
fn produce_attestation(
fn produce_attestation_data(
&self,
slot: Slot,
shard: u64,

View File

@ -1,5 +1,6 @@
use super::{PublicKey, SecretKey};
use serde_derive::{Deserialize, Serialize};
use std::hash::{Hash, Hasher};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Keypair {
@ -19,3 +20,15 @@ impl Keypair {
self.pk.concatenated_hex_id()
}
}
impl Hash for Keypair {
/// Note: this is distinct from consensus serialization, it will produce a different hash.
///
/// This method uses the uncompressed bytes, which are much faster to obtain than the
/// compressed bytes required for consensus serialization.
///
/// Use `ssz::Encode` to obtain the bytes required for consensus hashing.
fn hash<H: Hasher>(&self, state: &mut H) {
self.pk.as_uncompressed_bytes().hash(state)
}
}

View File

@ -35,7 +35,7 @@ service ValidatorService {
/// Service that handles validator attestations
service AttestationService {
rpc ProduceAttestation(ProduceAttestationRequest) returns (ProduceAttestationResponse);
rpc ProduceAttestationData(ProduceAttestationDataRequest) returns (ProduceAttestationDataResponse);
rpc PublishAttestation(PublishAttestationRequest) returns (PublishAttestationResponse);
}
@ -131,13 +131,13 @@ message ValidatorDuty {
* Attestation Service Messages
*/
message ProduceAttestationRequest {
message ProduceAttestationDataRequest {
uint64 slot = 1;
uint64 shard = 2;
}
message ProduceAttestationResponse {
Attestation attestation_data = 1;
message ProduceAttestationDataResponse {
AttestationData attestation_data = 1;
}
message PublishAttestationRequest {
@ -155,7 +155,7 @@ message Crosslink {
}
message Attestation {
message AttestationData {
uint64 slot = 1;
uint64 shard = 2;
bytes beacon_block_root = 3;
@ -168,7 +168,7 @@ message Attestation {
}
message FreeAttestation {
Attestation attestation_data = 1;
AttestationData data = 1;
bytes signature = 2;
uint64 validator_index = 3;
}

View File

@ -2,8 +2,8 @@ use protos::services_grpc::AttestationServiceClient;
use std::sync::Arc;
use attester::{BeaconNode, BeaconNodeError, PublishOutcome};
use protos::services::ProduceAttestationRequest;
use types::{AttestationData, FreeAttestation, Slot};
use protos::services::ProduceAttestationDataRequest;
use types::{Attestation, AttestationData, Slot};
pub struct AttestationGrpcClient {
client: Arc<AttestationServiceClient>,
@ -14,20 +14,20 @@ impl AttestationGrpcClient {
Self { client }
}
}
/*
impl BeaconNode for AttestationGrpcClient {
fn produce_attestation(
fn produce_attestation_data(
&self,
slot: Slot,
shard: u64,
) -> Result<Option<AttestationData>, BeaconNodeError> {
let mut req = ProduceAttestationRequest::new();
let mut req = ProduceAttestationDataRequest::new();
req.set_slot(slot.as_u64());
req.set_shard(shard);
let reply = self
.client
.produce_attestation(&req)
.produce_attestation_data(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
// TODO: return correct Attestation
@ -36,9 +36,10 @@ impl BeaconNode for AttestationGrpcClient {
fn publish_attestation(
&self,
free_attestation: FreeAttestation,
attestation: Attestation,
) -> Result<PublishOutcome, BeaconNodeError> {
// TODO: return correct PublishOutcome
Err(BeaconNodeError::DecodeFailure)
}
}
*/

View File

@ -6,18 +6,22 @@ use std::time::Duration;
pub use self::attestation_grpc_client::AttestationGrpcClient;
pub struct AttesterService<T: SlotClock, U: BeaconNode, V: DutiesReader, W: Signer> {
pub attester: Attester<T, U, V, W>,
pub struct AttesterService {}
/*
pub struct AttesterService<U: BeaconNode, W: Signer> {
// pub attester: Attester<U, W>,
pub poll_interval_millis: u64,
pub log: Logger,
}
impl<T: SlotClock, U: BeaconNode, V: DutiesReader, W: Signer> AttesterService<T, U, V, W> {
impl<U: BeaconNode, W: Signer> AttesterService<U, W> {
/// Run a loop which polls the Attester each `poll_interval_millis` millseconds.
///
/// Logs the results of the polls.
pub fn run(&mut self) {
loop {
/* We don't do the polling any more...
match self.attester.poll() {
Err(error) => {
error!(self.log, "Attester poll error"; "error" => format!("{:?}", error))
@ -47,8 +51,10 @@ impl<T: SlotClock, U: BeaconNode, V: DutiesReader, W: Signer> AttesterService<T,
error!(self.log, "The Beacon Node does not recognise the validator"; "slot" => slot)
}
};
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
*/
println!("Legacy polling still happening...");
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
}
*/

View File

@ -1,5 +1,5 @@
use super::EpochDuties;
use types::{Epoch, PublicKey};
use types::{Epoch, Keypair};
#[derive(Debug, PartialEq, Clone)]
pub enum BeaconNodeDutiesError {
@ -15,6 +15,6 @@ pub trait BeaconNodeDuties: Send + Sync {
fn request_duties(
&self,
epoch: Epoch,
pubkeys: &[PublicKey],
signers: &[Keypair],
) -> Result<EpochDuties, BeaconNodeDutiesError>;
}

View File

@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::fmt;
use std::ops::{Deref, DerefMut};
use types::{AttestationDuty, Epoch, PublicKey, Slot};
use types::{AttestationDuty, Epoch, Keypair, Slot};
/// When work needs to be performed by a validator, this type is given back to the main service
/// which indicates all the information that required to process the work.
@ -71,8 +71,8 @@ impl fmt::Display for EpochDuty {
}
}
/// Maps a list of public keys (many validators) to an EpochDuty.
pub type EpochDuties = HashMap<PublicKey, Option<EpochDuty>>;
/// Maps a list of keypairs (many validators) to an EpochDuty.
pub type EpochDuties = HashMap<Keypair, Option<EpochDuty>>;
pub enum EpochDutiesMapError {
UnknownEpoch,
@ -113,7 +113,7 @@ impl EpochDutiesMap {
pub fn is_work_slot(
&self,
slot: Slot,
pubkey: &PublicKey,
signer: &Keypair,
) -> Result<Option<WorkInfo>, EpochDutiesMapError> {
let epoch = slot.epoch(self.slots_per_epoch);
@ -121,7 +121,7 @@ impl EpochDutiesMap {
.map
.get(&epoch)
.ok_or_else(|| EpochDutiesMapError::UnknownEpoch)?;
if let Some(epoch_duty) = epoch_duties.get(pubkey) {
if let Some(epoch_duty) = epoch_duties.get(signer) {
if let Some(duty) = epoch_duty {
// Retrieves the duty for a validator at a given slot
return Ok(duty.is_work_slot(slot));

View File

@ -6,21 +6,21 @@ use protos::services_grpc::ValidatorServiceClient;
use ssz::ssz_encode;
use std::collections::HashMap;
use std::time::Duration;
use types::{Epoch, PublicKey, Slot};
use types::{Epoch, Keypair, Slot};
impl BeaconNodeDuties for ValidatorServiceClient {
/// Requests all duties (block signing and committee attesting) from the Beacon Node (BN).
fn request_duties(
&self,
epoch: Epoch,
pubkeys: &[PublicKey],
signers: &[Keypair],
) -> Result<EpochDuties, BeaconNodeDutiesError> {
// Get the required duties from all validators
// build the request
let mut req = GetDutiesRequest::new();
req.set_epoch(epoch.as_u64());
let mut validators = Validators::new();
validators.set_public_keys(pubkeys.iter().map(|v| ssz_encode(v)).collect());
validators.set_public_keys(signers.iter().map(|v| ssz_encode(&v.pk)).collect());
req.set_validators(validators);
// set a timeout for requests
@ -31,11 +31,11 @@ impl BeaconNodeDuties for ValidatorServiceClient {
.get_validator_duties(&req)
.map_err(|err| BeaconNodeDutiesError::RemoteFailure(format!("{:?}", err)))?;
let mut epoch_duties: HashMap<PublicKey, Option<EpochDuty>> = HashMap::new();
let mut epoch_duties: HashMap<Keypair, Option<EpochDuty>> = HashMap::new();
for (index, validator_duty) in reply.get_active_validators().iter().enumerate() {
if !validator_duty.has_duty() {
// validator is inactive
epoch_duties.insert(pubkeys[index].clone(), None);
epoch_duties.insert(signers[index].clone(), None);
continue;
}
// active validator
@ -53,7 +53,7 @@ impl BeaconNodeDuties for ValidatorServiceClient {
attestation_shard: active_duty.get_attestation_shard(),
committee_index: active_duty.get_committee_index(),
};
epoch_duties.insert(pubkeys[index].clone(), Some(epoch_duty));
epoch_duties.insert(signers[index].clone(), Some(epoch_duty));
}
Ok(epoch_duties)
}

View File

@ -12,7 +12,7 @@ use futures::Async;
use slog::{debug, error, info};
use std::sync::Arc;
use std::sync::RwLock;
use types::{Epoch, PublicKey, Slot};
use types::{Epoch, Keypair, Slot};
#[derive(Debug, PartialEq, Clone)]
pub enum UpdateOutcome {
@ -40,8 +40,9 @@ pub enum Error {
/// This keeps track of all validator keys and required voting slots.
pub struct DutiesManager<U: BeaconNodeDuties> {
pub duties_map: RwLock<EpochDutiesMap>,
/// A list of all public keys known to the validator service.
pub pubkeys: Vec<PublicKey>,
/// A list of all signer objects known to the validator service.
// TODO: Generalise the signers, so that they're not just keypairs
pub signers: Arc<Vec<Keypair>>,
pub beacon_node: Arc<U>,
}
@ -50,7 +51,7 @@ impl<U: BeaconNodeDuties> DutiesManager<U> {
///
/// be a wall-clock (e.g., system time, remote server time, etc.).
fn update(&self, epoch: Epoch) -> Result<UpdateOutcome, Error> {
let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?;
let duties = self.beacon_node.request_duties(epoch, &self.signers)?;
{
// If these duties were known, check to see if they're updates or identical.
if let Some(known_duties) = self.duties_map.read()?.get(&epoch) {
@ -91,17 +92,18 @@ impl<U: BeaconNodeDuties> DutiesManager<U> {
/// Returns a list of (Public, WorkInfo) indicating all the validators that have work to perform
/// this slot.
pub fn get_current_work(&self, slot: Slot) -> Option<Vec<(PublicKey, WorkInfo)>> {
let mut current_work: Vec<(PublicKey, WorkInfo)> = Vec::new();
pub fn get_current_work(&self, slot: Slot) -> Option<Vec<(Keypair, WorkInfo)>> {
let mut current_work: Vec<(Keypair, WorkInfo)> = Vec::new();
// if the map is poisoned, return None
let duties = self.duties_map.read().ok()?;
for validator_pk in &self.pubkeys {
match duties.is_work_slot(slot, &validator_pk) {
Ok(Some(work_type)) => current_work.push((validator_pk.clone(), work_type)),
for validator_signer in self.signers.iter() {
match duties.is_work_slot(slot, &validator_signer) {
Ok(Some(work_type)) => current_work.push((validator_signer.clone(), work_type)),
Ok(None) => {} // No work for this validator
Err(_) => {} // Unknown epoch or validator, no work
//TODO: This should really log an error, as we shouldn't end up with an err here.
Err(_) => {} // Unknown epoch or validator, no work
}
}
if current_work.is_empty() {
@ -136,9 +138,9 @@ impl From<EpochDutiesMapError> for Error {
fn print_duties(log: &slog::Logger, duties: EpochDuties) {
for (pk, duty) in duties.iter() {
if let Some(display_duty) = duty {
info!(log, "Validator: {}",pk; "Duty" => format!("{}",display_duty));
info!(log, "Validator: {:?}",pk; "Duty" => format!("{}",display_duty));
} else {
info!(log, "Validator: {}",pk; "Duty" => "None");
info!(log, "Validator: {:?}",pk; "Duty" => "None");
}
}
}

View File

@ -27,6 +27,7 @@ use slog::{debug, error, info, warn};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
use std::time::{Duration, Instant, SystemTime};
use tokio::prelude::*;
use tokio::runtime::Builder;
@ -37,6 +38,8 @@ use types::{ChainSpec, Epoch, Fork, Slot};
//TODO: This service should be simplified in the future. Can be made more steamlined.
const POLL_INTERVAL_MILLIS: u64 = 100;
/// The validator service. This is the main thread that executes and maintains validator
/// duties.
//TODO: Generalize the BeaconNode types to use testing
@ -180,7 +183,7 @@ impl<B: BeaconNodeDuties + 'static> Service<B> {
// and can check when a validator needs to perform a task.
let duties_manager = Arc::new(DutiesManager {
duties_map,
pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(),
signers: keypairs,
beacon_node: validator_client,
});
@ -314,8 +317,21 @@ impl<B: BeaconNodeDuties + 'static> Service<B> {
}
if work_type.attestation_duty.is_some() {
// available AttestationDuty info
let attestation_duty = work_type.attestation_duty.expect("Cannot be None");
//TODO: Produce an attestation in a new thread
/*
let attestation_duty =
work_type.attestation_duty.expect("Cannot be None");
let attester_grpc_client = Arc::new(AttestationGrpcClient::new(
service.attester_client.clone(),
));
let signer = Arc::new(AttesterLocalSigner::new(keypair.clone()));
let attester = Attester::new(attester_grpc_client, signer);
let mut attester_service = AttesterService {
attester,
poll_interval_millis: POLL_INTERVAL_MILLIS,
log: log.clone(),
};
attester_service.run();
*/
}
}
}