From b8ba0cd6982af9b0a1e6b82cd7ab8a796f82f15c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 21 Jan 2019 22:10:43 +1100 Subject: [PATCH] Split out validator index gRPC call --- beacon_node/src/rpc/beacon_block.rs | 57 ++++++++++++++++++ beacon_node/src/rpc/mod.rs | 81 ++++++-------------------- beacon_node/src/rpc/validator.rs | 52 +++++++++++++++++ protos/src/services.proto | 36 ++++++++++-- validator_client/src/duties/grpc.rs | 45 +++++++------- validator_client/src/duties/mod.rs | 19 +++--- validator_client/src/duties/service.rs | 16 +++-- 7 files changed, 206 insertions(+), 100 deletions(-) create mode 100644 beacon_node/src/rpc/beacon_block.rs create mode 100644 beacon_node/src/rpc/validator.rs diff --git a/beacon_node/src/rpc/beacon_block.rs b/beacon_node/src/rpc/beacon_block.rs new file mode 100644 index 000000000..a047365ef --- /dev/null +++ b/beacon_node/src/rpc/beacon_block.rs @@ -0,0 +1,57 @@ +use futures::Future; +use grpcio::{RpcContext, UnarySink}; +use protos::services::{ + BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse, + PublishBeaconBlockRequest, PublishBeaconBlockResponse, +}; +use protos::services_grpc::BeaconBlockService; +use slog::Logger; + +#[derive(Clone)] +pub struct BeaconBlockServiceInstance { + pub log: Logger, +} + +impl BeaconBlockService for BeaconBlockServiceInstance { + /// Produce a `BeaconBlock` for signing by a validator. + fn produce_beacon_block( + &mut self, + ctx: RpcContext, + req: ProduceBeaconBlockRequest, + sink: UnarySink, + ) { + println!("producing at slot {}", req.get_slot()); + + // TODO: build a legit block. + let mut block = BeaconBlockProto::new(); + block.set_slot(req.get_slot()); + block.set_block_root("cats".as_bytes().to_vec()); + + let mut resp = ProduceBeaconBlockResponse::new(); + resp.set_block(block); + + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } + + /// Accept some fully-formed `BeaconBlock`, process and publish it. + fn publish_beacon_block( + &mut self, + ctx: RpcContext, + req: PublishBeaconBlockRequest, + sink: UnarySink, + ) { + println!("publishing {:?}", req.get_block()); + + // TODO: actually process the block. + let mut resp = PublishBeaconBlockResponse::new(); + resp.set_success(true); + + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } +} diff --git a/beacon_node/src/rpc/mod.rs b/beacon_node/src/rpc/mod.rs index 38fc049c7..6a18a4aa8 100644 --- a/beacon_node/src/rpc/mod.rs +++ b/beacon_node/src/rpc/mod.rs @@ -1,73 +1,30 @@ +mod beacon_block; +mod validator; + +use self::beacon_block::BeaconBlockServiceInstance; +use self::validator::ValidatorServiceInstance; +use grpcio::{Environment, Server, ServerBuilder}; +use protos::services_grpc::{create_beacon_block_service, create_validator_service}; use std::sync::Arc; -use futures::Future; -use grpcio::{Environment, RpcContext, Server, ServerBuilder, UnarySink}; - -use protos::services::{ - BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse, - PublishBeaconBlockRequest, PublishBeaconBlockResponse, -}; -use protos::services_grpc::{create_beacon_block_service, BeaconBlockService}; - use slog::{info, Logger}; -#[derive(Clone)] -struct BeaconBlockServiceInstance { - log: Logger, -} - -impl BeaconBlockService for BeaconBlockServiceInstance { - /// Produce a `BeaconBlock` for signing by a validator. - fn produce_beacon_block( - &mut self, - ctx: RpcContext, - req: ProduceBeaconBlockRequest, - sink: UnarySink, - ) { - println!("producing at slot {}", req.get_slot()); - - // TODO: build a legit block. - let mut block = BeaconBlockProto::new(); - block.set_slot(req.get_slot()); - block.set_block_root("cats".as_bytes().to_vec()); - - let mut resp = ProduceBeaconBlockResponse::new(); - resp.set_block(block); - - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } - - /// Accept some fully-formed `BeaconBlock`, process and publish it. - fn publish_beacon_block( - &mut self, - ctx: RpcContext, - req: PublishBeaconBlockRequest, - sink: UnarySink, - ) { - println!("publishing {:?}", req.get_block()); - - // TODO: actually process the block. - let mut resp = PublishBeaconBlockResponse::new(); - resp.set_success(true); - - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } -} - pub fn start_server(log: Logger) -> Server { let log_clone = log.clone(); - let env = Arc::new(Environment::new(1)); - let instance = BeaconBlockServiceInstance { log }; - let service = create_beacon_block_service(instance); + + let beacon_block_service = { + let instance = BeaconBlockServiceInstance { log: log.clone() }; + create_beacon_block_service(instance) + }; + let validator_service = { + let instance = ValidatorServiceInstance { log: log.clone() }; + create_validator_service(instance) + }; + let mut server = ServerBuilder::new(env) - .register_service(service) + .register_service(beacon_block_service) + .register_service(validator_service) .bind("127.0.0.1", 50_051) .build() .unwrap(); diff --git a/beacon_node/src/rpc/validator.rs b/beacon_node/src/rpc/validator.rs new file mode 100644 index 000000000..f0c828872 --- /dev/null +++ b/beacon_node/src/rpc/validator.rs @@ -0,0 +1,52 @@ +use futures::Future; +use grpcio::{RpcContext, UnarySink}; +use protos::services::{ + IndexResponse, ProposeBlockSlotRequest, ProposeBlockSlotResponse, PublicKey as PublicKeyRequest, +}; +use protos::services_grpc::ValidatorService; +use slog::{debug, Logger}; + +#[derive(Clone)] +pub struct ValidatorServiceInstance { + pub log: Logger, +} + +impl ValidatorService for ValidatorServiceInstance { + fn validator_index( + &mut self, + ctx: RpcContext, + req: PublicKeyRequest, + sink: UnarySink, + ) { + debug!(self.log, "RPC got ValidatorIndex"; "public_key" => format!("{:x?}", req.get_public_key())); + + let mut resp = IndexResponse::new(); + + // TODO: return a legit value. + resp.set_index(1); + + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } + + fn propose_block_slot( + &mut self, + ctx: RpcContext, + req: ProposeBlockSlotRequest, + sink: UnarySink, + ) { + debug!(self.log, "RPC got ProposeBlockSlot"; "epoch" => req.get_epoch(), "validator_index" => req.get_validator_index()); + + let mut resp = ProposeBlockSlotResponse::new(); + + // TODO: return a legit value. + resp.set_slot(1); + + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } +} diff --git a/protos/src/services.proto b/protos/src/services.proto index 216ef7b2f..16e2d4dba 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -18,7 +18,9 @@ service BeaconBlockService { } service ValidatorService { - rpc ValidatorAssignment(ValidatorAssignmentRequest) returns (ValidatorAssignmentResponse); + // rpc ValidatorAssignment(ValidatorAssignmentRequest) returns (ValidatorAssignmentResponse); + rpc ProposeBlockSlot(ProposeBlockSlotRequest) returns (ProposeBlockSlotResponse); + rpc ValidatorIndex(PublicKey) returns (IndexResponse); } message BeaconBlock { @@ -49,6 +51,8 @@ message PublishBeaconBlockResponse { bytes msg = 2; } +// A validators duties for some epoch. +// TODO: add shard duties. message ValidatorAssignment { oneof block_production_slot_oneof { bool block_production_slot_none = 1; @@ -58,9 +62,33 @@ message ValidatorAssignment { message ValidatorAssignmentRequest { uint64 epoch = 1; - bytes public_key = 2; + bytes validator_index = 2; } -message ValidatorAssignmentResponse { - ValidatorAssignment validator_assignment = 1; +/* + * Propose slot + */ + +message ProposeBlockSlotRequest { + uint64 epoch = 1; + uint64 validator_index = 2; +} + +message ProposeBlockSlotResponse { + oneof slot_oneof { + bool none = 1; + uint64 slot = 2; + } +} + +/* + * Validator Assignment + */ + +message PublicKey { + bytes public_key = 1; +} + +message IndexResponse { + uint64 index = 1; } diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs index 4b9c8288e..fc552516d 100644 --- a/validator_client/src/duties/grpc.rs +++ b/validator_client/src/duties/grpc.rs @@ -1,6 +1,8 @@ use super::traits::{BeaconNode, BeaconNodeError}; use super::EpochDuties; -use protos::services::ValidatorAssignmentRequest; +use protos::services::{ + IndexResponse, ProposeBlockSlotRequest, ProposeBlockSlotResponse, PublicKey as IndexRequest, +}; use protos::services_grpc::ValidatorServiceClient; use ssz::ssz_encode; use types::PublicKey; @@ -11,30 +13,33 @@ impl BeaconNode for ValidatorServiceClient { epoch: u64, public_key: &PublicKey, ) -> Result, BeaconNodeError> { - let mut req = ValidatorAssignmentRequest::new(); + // Lookup the validator index for the supplied public key. + let validator_index = { + let mut req = IndexRequest::new(); + req.set_public_key(ssz_encode(public_key).to_vec()); + let resp = self + .validator_index(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + resp.get_index() + }; + + let mut req = ProposeBlockSlotRequest::new(); + req.set_validator_index(validator_index); req.set_epoch(epoch); - req.set_public_key(ssz_encode(public_key).to_vec()); let reply = self - .validator_assignment(&req) + .propose_block_slot(&req) .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; - if reply.has_validator_assignment() { - let assignment = reply.get_validator_assignment(); - - let block_production_slot = if assignment.has_block_production_slot() { - Some(assignment.get_block_production_slot()) - } else { - None - }; - - let duties = EpochDuties { - block_production_slot, - }; - - Ok(Some(duties)) + let block_production_slot = if reply.has_slot() { + Some(reply.get_slot()) } else { - Ok(None) - } + None + }; + + Ok(Some(EpochDuties { + validator_index, + block_production_slot, + })) } } diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index 0ac14b07f..acfe108ee 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -15,6 +15,7 @@ pub use self::service::DutiesManagerService; #[derive(Debug, PartialEq, Clone, Copy, Default)] pub struct EpochDuties { + pub validator_index: u64, pub block_production_slot: Option, // Future shard info } @@ -32,10 +33,10 @@ pub type EpochDutiesMap = HashMap; #[derive(Debug, PartialEq, Clone, Copy)] pub enum PollOutcome { - NoChange, - NewDuties, - DutiesChanged, - UnknownValidatorOrEpoch, + NoChange(u64, EpochDuties), + NewDuties(u64, EpochDuties), + DutiesChanged(u64, EpochDuties), + UnknownValidatorOrEpoch(u64), } #[derive(Debug, PartialEq)] @@ -79,17 +80,17 @@ impl DutiesManager { // If these duties were known, check to see if they're updates or identical. let result = if let Some(known_duties) = map.get(&epoch) { if *known_duties == duties { - Ok(PollOutcome::NoChange) + Ok(PollOutcome::NoChange(epoch, duties)) } else { - Ok(PollOutcome::DutiesChanged) + Ok(PollOutcome::DutiesChanged(epoch, duties)) } } else { - Ok(PollOutcome::NewDuties) + Ok(PollOutcome::NewDuties(epoch, duties)) }; map.insert(epoch, duties); result } else { - Ok(PollOutcome::UnknownValidatorOrEpoch) + Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) } } } @@ -129,6 +130,7 @@ mod tests { // Configure response from the BeaconNode. beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties { + validator_index: 0, block_production_slot: Some(10), }))); @@ -139,6 +141,7 @@ mod tests { // Return new duties. beacon_node.set_next_shuffling_result(Ok(Some(EpochDuties { + validator_index: 0, block_production_slot: Some(11), }))); assert_eq!(manager.poll(), Ok(PollOutcome::DutiesChanged)); diff --git a/validator_client/src/duties/service.rs b/validator_client/src/duties/service.rs index bedfd69ca..c51db0960 100644 --- a/validator_client/src/duties/service.rs +++ b/validator_client/src/duties/service.rs @@ -17,13 +17,17 @@ impl DutiesManagerService { Err(error) => { error!(self.log, "Epoch duties poll error"; "error" => format!("{:?}", error)) } - Ok(PollOutcome::NoChange) => debug!(self.log, "No change in duties"), - Ok(PollOutcome::DutiesChanged) => { - info!(self.log, "Duties changed (potential re-org)") + Ok(PollOutcome::NoChange(epoch, _)) => { + debug!(self.log, "No change in duties"; "epoch" => epoch) } - Ok(PollOutcome::NewDuties) => info!(self.log, "New duties obtained"), - Ok(PollOutcome::UnknownValidatorOrEpoch) => { - error!(self.log, "Epoch or validator unknown") + Ok(PollOutcome::DutiesChanged(epoch, duties)) => { + info!(self.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + } + Ok(PollOutcome::NewDuties(epoch, duties)) => { + info!(self.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + } + Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) => { + error!(self.log, "Epoch or validator unknown"; "epoch" => epoch) } };