Split out validator index gRPC call

This commit is contained in:
Paul Hauner 2019-01-21 22:10:43 +11:00
parent 158ffd7d1c
commit b8ba0cd698
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
7 changed files with 206 additions and 100 deletions

View File

@ -0,0 +1,57 @@
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use protos::services::{
BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse,
PublishBeaconBlockRequest, PublishBeaconBlockResponse,
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
#[derive(Clone)]
pub struct BeaconBlockServiceInstance {
pub log: Logger,
}
impl BeaconBlockService for BeaconBlockServiceInstance {
/// Produce a `BeaconBlock` for signing by a validator.
fn produce_beacon_block(
&mut self,
ctx: RpcContext,
req: ProduceBeaconBlockRequest,
sink: UnarySink<ProduceBeaconBlockResponse>,
) {
println!("producing at slot {}", req.get_slot());
// TODO: build a legit block.
let mut block = BeaconBlockProto::new();
block.set_slot(req.get_slot());
block.set_block_root("cats".as_bytes().to_vec());
let mut resp = ProduceBeaconBlockResponse::new();
resp.set_block(block);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `BeaconBlock`, process and publish it.
fn publish_beacon_block(
&mut self,
ctx: RpcContext,
req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>,
) {
println!("publishing {:?}", req.get_block());
// TODO: actually process the block.
let mut resp = PublishBeaconBlockResponse::new();
resp.set_success(true);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@ -1,73 +1,30 @@
mod beacon_block;
mod validator;
use self::beacon_block::BeaconBlockServiceInstance;
use self::validator::ValidatorServiceInstance;
use grpcio::{Environment, Server, ServerBuilder};
use protos::services_grpc::{create_beacon_block_service, create_validator_service};
use std::sync::Arc;
use futures::Future;
use grpcio::{Environment, RpcContext, Server, ServerBuilder, UnarySink};
use protos::services::{
BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse,
PublishBeaconBlockRequest, PublishBeaconBlockResponse,
};
use protos::services_grpc::{create_beacon_block_service, BeaconBlockService};
use slog::{info, Logger};
#[derive(Clone)]
struct BeaconBlockServiceInstance {
log: Logger,
}
impl BeaconBlockService for BeaconBlockServiceInstance {
/// Produce a `BeaconBlock` for signing by a validator.
fn produce_beacon_block(
&mut self,
ctx: RpcContext,
req: ProduceBeaconBlockRequest,
sink: UnarySink<ProduceBeaconBlockResponse>,
) {
println!("producing at slot {}", req.get_slot());
// TODO: build a legit block.
let mut block = BeaconBlockProto::new();
block.set_slot(req.get_slot());
block.set_block_root("cats".as_bytes().to_vec());
let mut resp = ProduceBeaconBlockResponse::new();
resp.set_block(block);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `BeaconBlock`, process and publish it.
fn publish_beacon_block(
&mut self,
ctx: RpcContext,
req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>,
) {
println!("publishing {:?}", req.get_block());
// TODO: actually process the block.
let mut resp = PublishBeaconBlockResponse::new();
resp.set_success(true);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}
pub fn start_server(log: Logger) -> Server {
let log_clone = log.clone();
let env = Arc::new(Environment::new(1));
let instance = BeaconBlockServiceInstance { log };
let service = create_beacon_block_service(instance);
let beacon_block_service = {
let instance = BeaconBlockServiceInstance { log: log.clone() };
create_beacon_block_service(instance)
};
let validator_service = {
let instance = ValidatorServiceInstance { log: log.clone() };
create_validator_service(instance)
};
let mut server = ServerBuilder::new(env)
.register_service(service)
.register_service(beacon_block_service)
.register_service(validator_service)
.bind("127.0.0.1", 50_051)
.build()
.unwrap();

View File

@ -0,0 +1,52 @@
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use protos::services::{
IndexResponse, ProposeBlockSlotRequest, ProposeBlockSlotResponse, PublicKey as PublicKeyRequest,
};
use protos::services_grpc::ValidatorService;
use slog::{debug, Logger};
#[derive(Clone)]
pub struct ValidatorServiceInstance {
pub log: Logger,
}
impl ValidatorService for ValidatorServiceInstance {
fn validator_index(
&mut self,
ctx: RpcContext,
req: PublicKeyRequest,
sink: UnarySink<IndexResponse>,
) {
debug!(self.log, "RPC got ValidatorIndex"; "public_key" => format!("{:x?}", req.get_public_key()));
let mut resp = IndexResponse::new();
// TODO: return a legit value.
resp.set_index(1);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
fn propose_block_slot(
&mut self,
ctx: RpcContext,
req: ProposeBlockSlotRequest,
sink: UnarySink<ProposeBlockSlotResponse>,
) {
debug!(self.log, "RPC got ProposeBlockSlot"; "epoch" => req.get_epoch(), "validator_index" => req.get_validator_index());
let mut resp = ProposeBlockSlotResponse::new();
// TODO: return a legit value.
resp.set_slot(1);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@ -18,7 +18,9 @@ service BeaconBlockService {
}
service ValidatorService {
rpc ValidatorAssignment(ValidatorAssignmentRequest) returns (ValidatorAssignmentResponse);
// rpc ValidatorAssignment(ValidatorAssignmentRequest) returns (ValidatorAssignmentResponse);
rpc ProposeBlockSlot(ProposeBlockSlotRequest) returns (ProposeBlockSlotResponse);
rpc ValidatorIndex(PublicKey) returns (IndexResponse);
}
message BeaconBlock {
@ -49,6 +51,8 @@ message PublishBeaconBlockResponse {
bytes msg = 2;
}
// A validators duties for some epoch.
// TODO: add shard duties.
message ValidatorAssignment {
oneof block_production_slot_oneof {
bool block_production_slot_none = 1;
@ -58,9 +62,33 @@ message ValidatorAssignment {
message ValidatorAssignmentRequest {
uint64 epoch = 1;
bytes public_key = 2;
bytes validator_index = 2;
}
message ValidatorAssignmentResponse {
ValidatorAssignment validator_assignment = 1;
/*
* Propose slot
*/
message ProposeBlockSlotRequest {
uint64 epoch = 1;
uint64 validator_index = 2;
}
message ProposeBlockSlotResponse {
oneof slot_oneof {
bool none = 1;
uint64 slot = 2;
}
}
/*
* Validator Assignment
*/
message PublicKey {
bytes public_key = 1;
}
message IndexResponse {
uint64 index = 1;
}

View File

@ -1,6 +1,8 @@
use super::traits::{BeaconNode, BeaconNodeError};
use super::EpochDuties;
use protos::services::ValidatorAssignmentRequest;
use protos::services::{
IndexResponse, ProposeBlockSlotRequest, ProposeBlockSlotResponse, PublicKey as IndexRequest,
};
use protos::services_grpc::ValidatorServiceClient;
use ssz::ssz_encode;
use types::PublicKey;
@ -11,30 +13,33 @@ impl BeaconNode for ValidatorServiceClient {
epoch: u64,
public_key: &PublicKey,
) -> Result<Option<EpochDuties>, BeaconNodeError> {
let mut req = ValidatorAssignmentRequest::new();
req.set_epoch(epoch);
// Lookup the validator index for the supplied public key.
let validator_index = {
let mut req = IndexRequest::new();
req.set_public_key(ssz_encode(public_key).to_vec());
let resp = self
.validator_index(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
resp.get_index()
};
let mut req = ProposeBlockSlotRequest::new();
req.set_validator_index(validator_index);
req.set_epoch(epoch);
let reply = self
.validator_assignment(&req)
.propose_block_slot(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
if reply.has_validator_assignment() {
let assignment = reply.get_validator_assignment();
let block_production_slot = if assignment.has_block_production_slot() {
Some(assignment.get_block_production_slot())
let block_production_slot = if reply.has_slot() {
Some(reply.get_slot())
} else {
None
};
let duties = EpochDuties {
Ok(Some(EpochDuties {
validator_index,
block_production_slot,
};
Ok(Some(duties))
} else {
Ok(None)
}
}))
}
}

View File

@ -15,6 +15,7 @@ pub use self::service::DutiesManagerService;
#[derive(Debug, PartialEq, Clone, Copy, Default)]
pub struct EpochDuties {
pub validator_index: u64,
pub block_production_slot: Option<u64>,
// Future shard info
}
@ -32,10 +33,10 @@ pub type EpochDutiesMap = HashMap<u64, EpochDuties>;
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum PollOutcome {
NoChange,
NewDuties,
DutiesChanged,
UnknownValidatorOrEpoch,
NoChange(u64, EpochDuties),
NewDuties(u64, EpochDuties),
DutiesChanged(u64, EpochDuties),
UnknownValidatorOrEpoch(u64),
}
#[derive(Debug, PartialEq)]
@ -79,17 +80,17 @@ impl<T: SlotClock, U: BeaconNode> DutiesManager<T, U> {
// If these duties were known, check to see if they're updates or identical.
let result = if let Some(known_duties) = map.get(&epoch) {
if *known_duties == duties {
Ok(PollOutcome::NoChange)
Ok(PollOutcome::NoChange(epoch, duties))
} else {
Ok(PollOutcome::DutiesChanged)
Ok(PollOutcome::DutiesChanged(epoch, duties))
}
} else {
Ok(PollOutcome::NewDuties)
Ok(PollOutcome::NewDuties(epoch, duties))
};
map.insert(epoch, duties);
result
} else {
Ok(PollOutcome::UnknownValidatorOrEpoch)
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch))
}
}
}
@ -129,6 +130,7 @@ mod tests {
// Configure response from the BeaconNode.
beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties {
validator_index: 0,
block_production_slot: Some(10),
})));
@ -139,6 +141,7 @@ mod tests {
// Return new duties.
beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties {
validator_index: 0,
block_production_slot: Some(11),
})));
assert_eq!(manager.poll(), Ok(PollOutcome::DutiesChanged));

View File

@ -17,13 +17,17 @@ impl<T: SlotClock, U: BeaconNode> DutiesManagerService<T, U> {
Err(error) => {
error!(self.log, "Epoch duties poll error"; "error" => format!("{:?}", error))
}
Ok(PollOutcome::NoChange) => debug!(self.log, "No change in duties"),
Ok(PollOutcome::DutiesChanged) => {
info!(self.log, "Duties changed (potential re-org)")
Ok(PollOutcome::NoChange(epoch, _)) => {
debug!(self.log, "No change in duties"; "epoch" => epoch)
}
Ok(PollOutcome::NewDuties) => info!(self.log, "New duties obtained"),
Ok(PollOutcome::UnknownValidatorOrEpoch) => {
error!(self.log, "Epoch or validator unknown")
Ok(PollOutcome::DutiesChanged(epoch, duties)) => {
info!(self.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(PollOutcome::NewDuties(epoch, duties)) => {
info!(self.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) => {
error!(self.log, "Epoch or validator unknown"; "epoch" => epoch)
}
};