Implement beacon node side of attestation production gRPC
This commit is contained in:
parent
d12ddae247
commit
d8fd7c8803
@ -7,9 +7,7 @@ use protos::services::{
|
|||||||
PublishAttestation
|
PublishAttestation
|
||||||
};
|
};
|
||||||
use protos::services_grpc::BeaconBlockService;
|
use protos::services_grpc::BeaconBlockService;
|
||||||
use slog::{Logger, info, warn, error};
|
use slog::{Logger, info, warn, error, trace};
|
||||||
|
|
||||||
const TEST_SHARD_PHASE_ZERO: u8 = 0;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AttestationServiceInstance {
|
pub struct AttestationServiceInstance {
|
||||||
@ -25,13 +23,12 @@ impl AttestationService for AttestationServiceInstance {
|
|||||||
req: ProduceAttestationDataRequest,
|
req: ProduceAttestationDataRequest,
|
||||||
sink: UnarySink<ProduceAttestationDataResponse>,
|
sink: UnarySink<ProduceAttestationDataResponse>,
|
||||||
) {
|
) {
|
||||||
info!(&self.log, "Attempting to produce attestation at slot {}", req.get_slot());
|
trace!(&self.log, "Attempting to produce attestation at slot {}", req.get_slot());
|
||||||
|
|
||||||
// get the chain spec & state
|
|
||||||
let spec = self.chain.get_spec();
|
|
||||||
let state = self.chain.get_state();
|
|
||||||
|
|
||||||
|
// verify the slot, drop lock on state afterwards
|
||||||
|
{
|
||||||
let slot_requested = req.get_slot();
|
let slot_requested = req.get_slot();
|
||||||
|
let state = self.chain.get_state();
|
||||||
|
|
||||||
// Start by performing some checks
|
// Start by performing some checks
|
||||||
// Check that the the AttestionData is for the current slot (otherwise it will not be valid)
|
// Check that the the AttestionData is for the current slot (otherwise it will not be valid)
|
||||||
@ -43,12 +40,30 @@ impl AttestationService for AttestationServiceInstance {
|
|||||||
))
|
))
|
||||||
.map_err(move |e| error!(&self.log, "Failed to reply with failure {:?}: {:?}", req, e));
|
.map_err(move |e| error!(&self.log, "Failed to reply with failure {:?}: {:?}", req, e));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Then get the AttestationData from the beacon chain (for shard 0 for now)
|
// Then get the AttestationData from the beacon chain
|
||||||
let attestation_data = self.chain.produce_attestation_data(TEST_SHARD_PHASE_ZERO);
|
let attestation_data = match self.chain.produce_attestation_data(req.get_shard()){
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
// Could not produce an attestation
|
||||||
|
let log_clone = self.log.clone();
|
||||||
|
let f = sink
|
||||||
|
.fail(RpcStatus::new(
|
||||||
|
RpcStatusCode::Unknown
|
||||||
|
Some(format!("Could not produce an attestation: {:?}",e)),
|
||||||
|
))
|
||||||
|
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
|
||||||
|
return ctx.spawn(f);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
let mut attestation_data_proto = AttestationDataProto::new();
|
||||||
|
attestation_data_proto.set_ssz(ssz_encode(&attestation_data));
|
||||||
|
|
||||||
let mut resp = ProduceAttestationDataResponse::new();
|
let mut resp = ProduceAttestationDataResponse::new();
|
||||||
resp.set_attestation_data(attestation_data);
|
resp.set_attestation_data(attestation_data_proto);
|
||||||
|
|
||||||
let f = sink
|
let f = sink
|
||||||
.success(resp)
|
.success(resp)
|
||||||
@ -64,12 +79,49 @@ impl AttestationService for AttestationServiceInstance {
|
|||||||
req: PublishAttestationRequest,
|
req: PublishAttestationRequest,
|
||||||
sink: UnarySink<PublishAttestationResponse>,
|
sink: UnarySink<PublishAttestationResponse>,
|
||||||
) {
|
) {
|
||||||
println!("publishing attestation {:?}", req.get_block());
|
trace!(self.log, "Publishing attestation");
|
||||||
|
|
||||||
// TODO: actually process the block.
|
|
||||||
let mut resp = PublishAttestationResponse::new();
|
let mut resp = PublishAttestationResponse::new();
|
||||||
|
let ssz_serialized_attestation = req.get_attestation().get_ssz();
|
||||||
|
|
||||||
|
let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) {
|
||||||
|
Ok((v, _index)) => v,
|
||||||
|
Err(_) => {
|
||||||
|
let log_clone = self.log.clone();
|
||||||
|
let f = sink
|
||||||
|
.fail(RpcStatus::new(
|
||||||
|
RpcStatusCode::InvalidArgument,
|
||||||
|
Some("Invalid attestation".to_string()),
|
||||||
|
))
|
||||||
|
.map_err(move |e| warn!(log_clone, "failed to reply {:?}", req));
|
||||||
|
return ctx.spawn(f);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.chain.process_attestation(attestation) {
|
||||||
|
Ok(_) => {
|
||||||
|
// Attestation was successfully processed.
|
||||||
|
info!(
|
||||||
|
self.log,
|
||||||
|
"PublishAttestation";
|
||||||
|
"type" => "valid_attestation",
|
||||||
|
);
|
||||||
|
|
||||||
resp.set_success(true);
|
resp.set_success(true);
|
||||||
|
},
|
||||||
|
Err(e)=> {
|
||||||
|
// Attestation was invalid
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"PublishAttestation";
|
||||||
|
"type" => "invalid_attestation",
|
||||||
|
);
|
||||||
|
resp.set_success(false);
|
||||||
|
resp.set_msg(
|
||||||
|
format!("InvalidAttestation: {:?}", e).as_bytes().to_vec(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let f = sink
|
let f = sink
|
||||||
.success(resp)
|
.success(resp)
|
||||||
|
Loading…
Reference in New Issue
Block a user