Registers the attester service to the beacon node RPC client

This commit is contained in:
Age Manning 2019-03-30 19:32:32 +11:00
parent e1befe9d3a
commit fc5142c09a
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
11 changed files with 425 additions and 140 deletions

View File

@ -5,7 +5,9 @@ mod errors;
pub mod initialise;
pub mod test_utils;
pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock};
pub use self::beacon_chain::{
AttestationValidationError, BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock,
};
pub use self::checkpoint::CheckPoint;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use attestation_aggregator::Outcome as AggregationOutcome;

View File

@ -0,0 +1,143 @@
use crate::beacon_chain::BeaconChain;
use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use protos::services::{
AttestationData as AttestationDataProto, ProduceAttestationDataRequest,
ProduceAttestationDataResponse, PublishAttestationRequest, PublishAttestationResponse,
};
use protos::services_grpc::AttestationService;
use slog::{error, info, trace, warn, Logger};
use ssz::{ssz_encode, Decodable};
use std::sync::Arc;
use types::Attestation;
#[derive(Clone)]
pub struct AttestationServiceInstance {
pub chain: Arc<BeaconChain>,
pub log: slog::Logger,
}
impl AttestationService for AttestationServiceInstance {
/// Produce the `AttestationData` for signing by a validator.
fn produce_attestation_data(
&mut self,
ctx: RpcContext,
req: ProduceAttestationDataRequest,
sink: UnarySink<ProduceAttestationDataResponse>,
) {
warn!(
&self.log,
"Attempting to produce attestation at slot {}",
req.get_slot()
);
// verify the slot, drop lock on state afterwards
{
let slot_requested = req.get_slot();
let state = self.chain.get_state();
// Start by performing some checks
// Check that the the AttestionData is for the current slot (otherwise it will not be valid)
if slot_requested != state.slot.as_u64() {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::OutOfRange,
Some(format!(
"AttestationData request for a slot that is not the current slot."
)),
))
.map_err(move |e| {
error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e)
});
return ctx.spawn(f);
}
}
// Then get the AttestationData from the beacon chain
let shard = req.get_shard();
let attestation_data = match self.chain.produce_attestation_data(shard) {
Ok(v) => v,
Err(e) => {
// Could not produce an attestation
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::Unknown,
Some(format!("Could not produce an attestation: {:?}", e)),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
let mut attestation_data_proto = AttestationDataProto::new();
attestation_data_proto.set_ssz(ssz_encode(&attestation_data));
let mut resp = ProduceAttestationDataResponse::new();
resp.set_attestation_data(attestation_data_proto);
let error_log = self.log.clone();
let f = sink
.success(resp)
.map_err(move |e| error!(error_log, "Failed to reply with success {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `FreeAttestation` from the validator,
/// store it, and aggregate it into an `Attestation`.
fn publish_attestation(
&mut self,
ctx: RpcContext,
req: PublishAttestationRequest,
sink: UnarySink<PublishAttestationResponse>,
) {
warn!(self.log, "Publishing attestation");
let mut resp = PublishAttestationResponse::new();
let ssz_serialized_attestation = req.get_attestation().get_ssz();
let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) {
Ok((v, _index)) => v,
Err(_) => {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some("Invalid attestation".to_string()),
))
.map_err(move |_| warn!(log_clone, "failed to reply {:?}", req));
return ctx.spawn(f);
}
};
match self.chain.process_attestation(attestation) {
Ok(_) => {
// Attestation was successfully processed.
info!(
self.log,
"PublishAttestation";
"type" => "valid_attestation",
);
resp.set_success(true);
}
Err(e) => {
// Attestation was invalid
warn!(
self.log,
"PublishAttestation";
"type" => "invalid_attestation",
);
resp.set_success(false);
resp.set_msg(format!("InvalidAttestation: {:?}", e).as_bytes().to_vec());
}
};
let error_log = self.log.clone();
let f = sink
.success(resp)
.map_err(move |e| error!(error_log, "failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@ -1,131 +0,0 @@
use crate::beacon_chain::BeaconChain;
use futures::Future;
use grpcio::{RpcContext, UnarySink, RpcStatus, RpcStatusCode};
use protos::services::{
AttestationData as AttestationDataProto, ProduceAttestationData, ProduceAttestationDataResponse,
ProduceAttestationDataRequest, PublishAttestationResponse, PublishAttestationRequest,
PublishAttestation
};
use protos::services_grpc::BeaconBlockService;
use slog::{Logger, info, warn, error, trace};
#[derive(Clone)]
pub struct AttestationServiceInstance {
pub chain: Arc<BeaconChain>,
pub log: Logger,
}
impl AttestationService for AttestationServiceInstance {
/// Produce the `AttestationData` for signing by a validator.
fn produce_attestation_data(
&mut self,
ctx: RpcContext,
req: ProduceAttestationDataRequest,
sink: UnarySink<ProduceAttestationDataResponse>,
) {
trace!(&self.log, "Attempting to produce attestation at slot {}", req.get_slot());
// verify the slot, drop lock on state afterwards
{
let slot_requested = req.get_slot();
let state = self.chain.get_state();
// Start by performing some checks
// Check that the the AttestionData is for the current slot (otherwise it will not be valid)
if slot_requested != state.slot {
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::OutOfRange,
"AttestationData request for a slot that is not the current slot."
))
.map_err(move |e| error!(&self.log, "Failed to reply with failure {:?}: {:?}", req, e));
}
}
// Then get the AttestationData from the beacon chain
let attestation_data = match self.chain.produce_attestation_data(req.get_shard()){
Ok(v) => v,
Err(e) => {
// Could not produce an attestation
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::Unknown
Some(format!("Could not produce an attestation: {:?}",e)),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
return ctx.spawn(f);
}
};
let mut attestation_data_proto = AttestationDataProto::new();
attestation_data_proto.set_ssz(ssz_encode(&attestation_data));
let mut resp = ProduceAttestationDataResponse::new();
resp.set_attestation_data(attestation_data_proto);
let f = sink
.success(resp)
.map_err(move |e| error!("Failed to reply with success {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `FreeAttestation` from the validator,
/// store it, and aggregate it into an `Attestation`.
fn publish_attestation(
&mut self,
ctx: RpcContext,
req: PublishAttestationRequest,
sink: UnarySink<PublishAttestationResponse>,
) {
trace!(self.log, "Publishing attestation");
let mut resp = PublishAttestationResponse::new();
let ssz_serialized_attestation = req.get_attestation().get_ssz();
let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) {
Ok((v, _index)) => v,
Err(_) => {
let log_clone = self.log.clone();
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some("Invalid attestation".to_string()),
))
.map_err(move |e| warn!(log_clone, "failed to reply {:?}", req));
return ctx.spawn(f);
}
};
match self.chain.process_attestation(attestation) {
Ok(_) => {
// Attestation was successfully processed.
info!(
self.log,
"PublishAttestation";
"type" => "valid_attestation",
);
resp.set_success(true);
},
Err(e)=> {
// Attestation was invalid
warn!(
self.log,
"PublishAttestation";
"type" => "invalid_attestation",
);
resp.set_success(false);
resp.set_msg(
format!("InvalidAttestation: {:?}", e).as_bytes().to_vec(),
);
}
};
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@ -36,8 +36,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
// decode the request
// TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336
let _requested_slot = Slot::from(req.get_slot());
let (randao_reveal, _index) = match Signature::ssz_decode(req.get_randao_reveal(), 0) {
Ok(v) => v,
let randao_reveal = match Signature::ssz_decode(req.get_randao_reveal(), 0) {
Ok((reveal, _index)) => reveal,
Err(_) => {
// decode error, incorrect signature
let log_clone = self.log.clone();
@ -86,6 +86,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>,
) {
trace!(&self.log, "Attempting to publish a block");
let mut resp = PublishBeaconBlockResponse::new();
let ssz_serialized_block = req.get_block().get_ssz();

View File

@ -5,10 +5,10 @@ use beacon_chain::{
parking_lot::RwLockReadGuard,
slot_clock::SlotClock,
types::{BeaconState, ChainSpec, Signature},
BlockProductionError,
AttestationValidationError, BlockProductionError,
};
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};
use types::BeaconBlock;
use types::{Attestation, AttestationData, BeaconBlock};
/// The RPC's API to the beacon chain.
pub trait BeaconChain: Send + Sync {
@ -23,6 +23,13 @@ pub trait BeaconChain: Send + Sync {
&self,
randao_reveal: Signature,
) -> Result<(BeaconBlock, BeaconState), BlockProductionError>;
fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, BeaconChainError>;
fn process_attestation(
&self,
attestation: Attestation,
) -> Result<(), AttestationValidationError>;
}
impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F>
@ -52,4 +59,15 @@ where
) -> Result<(BeaconBlock, BeaconState), BlockProductionError> {
self.produce_block(randao_reveal)
}
fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, BeaconChainError> {
self.produce_attestation_data(shard)
}
fn process_attestation(
&self,
attestation: Attestation,
) -> Result<(), AttestationValidationError> {
self.process_attestation(attestation)
}
}

View File

@ -1,19 +1,22 @@
mod attestation;
mod beacon_block;
pub mod beacon_chain;
mod beacon_node;
pub mod config;
mod validator;
use self::attestation::AttestationServiceInstance;
use self::beacon_block::BeaconBlockServiceInstance;
use self::beacon_chain::BeaconChain;
use self::beacon_node::BeaconNodeServiceInstance;
use self::validator::ValidatorServiceInstance;
pub use config::Config as RPCConfig;
use futures::{future, Future};
use grpcio::{Environment, Server, ServerBuilder};
use futures::Future;
use grpcio::{Environment, ServerBuilder};
use network::NetworkMessage;
use protos::services_grpc::{
create_beacon_block_service, create_beacon_node_service, create_validator_service,
create_attestation_service, create_beacon_block_service, create_beacon_node_service,
create_validator_service,
};
use slog::{info, o, warn};
use std::sync::Arc;
@ -56,11 +59,19 @@ pub fn start_server(
};
create_validator_service(instance)
};
let attestation_service = {
let instance = AttestationServiceInstance {
chain: beacon_chain.clone(),
log: log.clone(),
};
create_attestation_service(instance)
};
let mut server = ServerBuilder::new(env)
.register_service(beacon_block_service)
.register_service(validator_service)
.register_service(beacon_node_service)
.register_service(attestation_service)
.bind(config.listen_address.to_string(), config.port)
.build()
.unwrap();

View File

@ -120,7 +120,7 @@ impl TestingBeaconStateBuilder {
})
.collect();
let genesis_time = 1553918534; // arbitrary
let genesis_time = 1553932445; // arbitrary
let mut state = BeaconState::genesis(
genesis_time,

View File

@ -0,0 +1,23 @@
//TODO: generalise these enums to the crate
use crate::block_producer::{BeaconNodeError, PublishOutcome};
use types::{Attestation, AttestationData, Slot};
/// Defines the methods required to produce and publish attestations on a Beacon Node. Abstracts the
/// actual beacon node.
pub trait BeaconNodeAttestation: Send + Sync {
/// Request that the node produces the required attestation data.
///
fn produce_attestation_data(
&self,
slot: Slot,
shard: u64,
) -> Result<AttestationData, BeaconNodeError>;
/// Request that the node publishes a attestation.
///
/// Returns `true` if the publish was successful.
fn publish_attestation(
&self,
attestation: Attestation,
) -> Result<PublishOutcome, BeaconNodeError>;
}

View File

@ -0,0 +1,59 @@
use super::beacon_node_attestation::BeaconNodeAttestation;
use crate::block_producer::{BeaconNodeError, PublishOutcome};
use protos::services_grpc::AttestationServiceClient;
use ssz::{ssz_encode, Decodable};
use protos::services::{
Attestation as GrpcAttestation, ProduceAttestationDataRequest, PublishAttestationRequest,
};
use types::{Attestation, AttestationData, Slot};
impl BeaconNodeAttestation for AttestationServiceClient {
fn produce_attestation_data(
&self,
slot: Slot,
shard: u64,
) -> Result<AttestationData, BeaconNodeError> {
let mut req = ProduceAttestationDataRequest::new();
req.set_slot(slot.as_u64());
req.set_shard(shard);
let reply = self
.produce_attestation_data(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
dbg!("Produced Attestation Data");
let (attestation_data, _index) =
AttestationData::ssz_decode(reply.get_attestation_data().get_ssz(), 0)
.map_err(|_| BeaconNodeError::DecodeFailure)?;
Ok(attestation_data)
}
fn publish_attestation(
&self,
attestation: Attestation,
) -> Result<PublishOutcome, BeaconNodeError> {
let mut req = PublishAttestationRequest::new();
let ssz = ssz_encode(&attestation);
let mut grpc_attestation = GrpcAttestation::new();
grpc_attestation.set_ssz(ssz);
req.set_attestation(grpc_attestation);
let reply = self
.publish_attestation(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
if reply.get_success() {
Ok(PublishOutcome::Valid)
} else {
// TODO: distinguish between different errors
Ok(PublishOutcome::InvalidAttestation(
"Publish failed".to_string(),
))
}
}
}

View File

@ -0,0 +1,155 @@
mod beacon_node_attestation;
mod grpc;
use std::sync::Arc;
use types::{BeaconBlock, ChainSpec, Domain, Fork, Slot};
//TODO: Move these higher up in the crate
use super::block_producer::{BeaconNodeError, ValidatorEvent};
use crate::signer::Signer;
use beacon_node_attestation::BeaconNodeAttestation;
use slog::{error, info, warn};
use ssz::TreeHash;
use types::{
AggregateSignature, Attestation, AttestationData, AttestationDataAndCustodyBit,
AttestationDuty, Bitfield,
};
//TODO: Group these errors at a crate level
#[derive(Debug, PartialEq)]
pub enum Error {
BeaconNodeError(BeaconNodeError),
}
impl From<BeaconNodeError> for Error {
fn from(e: BeaconNodeError) -> Error {
Error::BeaconNodeError(e)
}
}
/// This struct contains the logic for requesting and signing beacon attestations for a validator. The
/// validator can abstractly sign via the Signer trait object.
pub struct AttestationProducer<'a, B: BeaconNodeAttestation, S: Signer> {
/// The current fork.
pub fork: Fork,
/// The attestation duty to perform.
pub duty: AttestationDuty,
/// The current epoch.
pub spec: Arc<ChainSpec>,
/// The beacon node to connect to.
pub beacon_node: Arc<B>,
/// The signer to sign the block.
pub signer: &'a S,
}
impl<'a, B: BeaconNodeAttestation, S: Signer> AttestationProducer<'a, B, S> {
/// Handle outputs and results from attestation production.
pub fn handle_produce_attestation(&mut self, log: slog::Logger) {
match self.produce_attestation() {
Ok(ValidatorEvent::AttestationProduced(_slot)) => {
info!(log, "Attestation produced"; "Validator" => format!("{}", self.signer))
}
Err(e) => error!(log, "Attestation production error"; "Error" => format!("{:?}", e)),
Ok(ValidatorEvent::SignerRejection(_slot)) => {
error!(log, "Attestation production error"; "Error" => format!("Signer could not sign the attestation"))
}
Ok(ValidatorEvent::SlashableAttestationNotProduced(_slot)) => {
error!(log, "Attestation production error"; "Error" => format!("Rejected the attestation as it could have been slashed"))
}
Ok(ValidatorEvent::BeaconNodeUnableToProduceAttestation(_slot)) => {
error!(log, "Attestation production error"; "Error" => format!("Beacon node was unable to produce an attestation"))
}
Ok(v) => {
warn!(log, "Unknown result for attestation production"; "Error" => format!("{:?}",v))
}
}
}
/// Produce an attestation, sign it and send it back
///
/// Assumes that an attestation is required at this slot (does not check the duties).
///
/// Ensures the message is not slashable.
///
/// !!! UNSAFE !!!
///
/// The slash-protection code is not yet implemented. There is zero protection against
/// slashing.
pub fn produce_attestation(&mut self) -> Result<ValidatorEvent, Error> {
let epoch = self.duty.slot.epoch(self.spec.slots_per_epoch);
let attestation = self
.beacon_node
.produce_attestation_data(self.duty.slot, self.duty.shard)?;
if self.safe_to_produce(&attestation) {
let domain = self.spec.get_domain(epoch, Domain::Attestation, &self.fork);
if let Some(attestation) = self.sign_attestation(attestation, self.duty, domain) {
self.beacon_node.publish_attestation(attestation)?;
Ok(ValidatorEvent::AttestationProduced(self.duty.slot))
} else {
Ok(ValidatorEvent::SignerRejection(self.duty.slot))
}
} else {
Ok(ValidatorEvent::SlashableAttestationNotProduced(
self.duty.slot,
))
}
}
/// Consumes an attestation, returning the attestation signed by the validators private key.
///
/// Important: this function will not check to ensure the attestation is not slashable. This must be
/// done upstream.
fn sign_attestation(
&mut self,
mut attestation: AttestationData,
duties: AttestationDuty,
domain: u64,
) -> Option<Attestation> {
self.store_produce(&attestation);
// build the aggregate signature
let aggregate_signature = {
let message = AttestationDataAndCustodyBit {
data: attestation.clone(),
custody_bit: false,
}
.hash_tree_root();
let sig = self.signer.sign_message(&message, domain)?;
let mut agg_sig = AggregateSignature::new();
agg_sig.add(&sig);
agg_sig
};
let mut aggregation_bitfield = Bitfield::with_capacity(duties.committee_len);
let custody_bitfield = Bitfield::with_capacity(duties.committee_len);
aggregation_bitfield.set(duties.committee_index, true);
Some(Attestation {
aggregation_bitfield,
data: attestation,
custody_bitfield,
aggregate_signature,
})
}
/// Returns `true` if signing an attestation is safe (non-slashable).
///
/// !!! UNSAFE !!!
///
/// Important: this function is presently stubbed-out. It provides ZERO SAFETY.
fn safe_to_produce(&self, _attestation: &AttestationData) -> bool {
//TODO: Implement slash protection
true
}
/// Record that an attestation was produced so that slashable votes may not be made in the future.
///
/// !!! UNSAFE !!!
///
/// Important: this function is presently stubbed-out. It provides ZERO SAFETY.
fn store_produce(&mut self, _attestation: &AttestationData) {
// TODO: Implement slash protection
}
}

View File

@ -290,6 +290,7 @@ impl<B: BeaconNodeDuties + 'static, S: Signer + 'static> Service<B, S> {
// the return value is a future which returns ready.
// built to be compatible with the tokio runtime.
let _empty = cloned_manager.run_update(current_epoch.clone(), cloned_log.clone());
dbg!("Duties Thread Ended");
});
}
@ -317,6 +318,7 @@ impl<B: BeaconNodeDuties + 'static, S: Signer + 'static> Service<B, S> {
signer,
};
block_producer.handle_produce_block(log);
dbg!("Block produce Thread Ended");
});
}
if work_type.attestation_duty.is_some() {
@ -338,6 +340,7 @@ impl<B: BeaconNodeDuties + 'static, S: Signer + 'static> Service<B, S> {
signer,
};
attestation_producer.handle_produce_attestation(log);
dbg!("Attestation Thread Ended");
});
}
}