From 8f3530f60ce900f41d48c2d407e26cb0a727f9b3 Mon Sep 17 00:00:00 2001 From: thojest Date: Fri, 8 Mar 2019 13:48:33 +0100 Subject: [PATCH 1/3] created attester_service and started to create an attester_thread in main of validator_client (lighthouse-255) --- validator_client/Cargo.toml | 1 + validator_client/src/attester_service/mod.rs | 51 ++++++++++++++++++++ validator_client/src/main.rs | 29 ++++++++++- 3 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 validator_client/src/attester_service/mod.rs diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index f76772f28..cdde71774 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] block_proposer = { path = "../eth2/block_proposer" } +attester = { path = "../eth2/attester" } bls = { path = "../eth2/utils/bls" } clap = "2.32.0" dirs = "1.0.3" diff --git a/validator_client/src/attester_service/mod.rs b/validator_client/src/attester_service/mod.rs new file mode 100644 index 000000000..fbf8bd005 --- /dev/null +++ b/validator_client/src/attester_service/mod.rs @@ -0,0 +1,51 @@ +use attester::{Attester, BeaconNode, DutiesReader, PollOutcome as AttesterPollOutcome, Signer}; +use slog::Logger; +use slot_clock::SlotClock; +use std::time::Duration; + +pub struct AttesterService { + pub attester: Attester, + pub poll_interval_millis: u64, + pub log: Logger, +} + +impl AttesterService { + /// Run a loop which polls the Attester each `poll_interval_millis` millseconds. + /// + /// Logs the results of the polls. + pub fn run(&mut self) { + loop { + match self.attester.poll() { + Err(error) => { + error!(self.log, "Attester poll error"; "error" => format!("{:?}", error)) + } + Ok(AttesterPollOutcome::AttestationProduced(slot)) => { + info!(self.log, "Produced Attestation"; "slot" => slot) + } + Ok(AttesterPollOutcome::SlashableAttestationNotProduced(slot)) => { + warn!(self.log, "Slashable attestation was not produced"; "slot" => slot) + } + Ok(AttesterPollOutcome::AttestationNotRequired(slot)) => { + info!(self.log, "Attestation not required"; "slot" => slot) + } + Ok(AttesterPollOutcome::ProducerDutiesUnknown(slot)) => { + error!(self.log, "Attestation duties unknown"; "slot" => slot) + } + Ok(AttesterPollOutcome::SlotAlreadyProcessed(slot)) => { + warn!(self.log, "Attempted to re-process slot"; "slot" => slot) + } + Ok(AttesterPollOutcome::BeaconNodeUnableToProduceAttestation(slot)) => { + error!(self.log, "Beacon node unable to produce attestation"; "slot" => slot) + } + Ok(AttesterPollOutcome::SignerRejection(slot)) => { + error!(self.log, "The cryptographic signer refused to sign the attestation"; "slot" => slot) + } + Ok(AttesterPollOutcome::ValidatorIsUnknown(slot)) => { + error!(self.log, "The Beacon Node does not recognise the validator"; "slot" => slot) + } + }; + + std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); + } + } +} diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index ebab8538c..2e432ceff 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,7 +1,9 @@ use self::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; +use crate::attester_service::AttesterService; use crate::config::ClientConfig; -use block_proposer::{test_utils::LocalSigner, BlockProducer}; +use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; +use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; use bls::Keypair; use clap::{App, Arg}; use grpcio::{ChannelBuilder, EnvBuilder}; @@ -13,7 +15,9 @@ use std::sync::Arc; use std::thread; use types::ChainSpec; +mod attester_service; mod block_producer_service; + mod config; mod duties; @@ -160,7 +164,7 @@ fn main() { // Spawn a new thread to perform block production for the validator. let producer_thread = { let spec = spec.clone(); - let signer = Arc::new(LocalSigner::new(keypair.clone())); + let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone())); let duties_map = duties_map.clone(); let slot_clock = slot_clock.clone(); let log = log.clone(); @@ -178,6 +182,27 @@ fn main() { }) }; + //Spawn a new thread for attestation for the validator. + let attester_thread = { + let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); + let duties_map = duties_map.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + //TODO: this is wrong, I assume this has to be AttesterGrpcClient, which has to be defined analogous + // to beacon_block_grpc_client.rs + let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); + thread::spawn(move || { + let attester = Attester::new(duties_map, slot_clock, client, signer); + let mut attester_service = AttesterService { + attester, + poll_interval_millis, + log, + }; + + block_producer_service.run(); + }) + }; + threads.push((duties_manager_thread, producer_thread)); } From 2215aa4b4640e5bf4a5ace25a184de1dd0ad934c Mon Sep 17 00:00:00 2001 From: thojest Date: Fri, 15 Mar 2019 11:44:39 +0100 Subject: [PATCH 2/3] added protos specification for Attester and created first draft for attestation_grpc_client (lighthouse-255) --- protos/src/services.proto | 47 +++++++++++++++++++ .../attestation_grpc_client.rs | 32 +++++++++++++ validator_client/src/attester_service/mod.rs | 5 +- validator_client/src/main.rs | 21 ++++++--- 4 files changed, 97 insertions(+), 8 deletions(-) create mode 100644 validator_client/src/attester_service/attestation_grpc_client.rs diff --git a/protos/src/services.proto b/protos/src/services.proto index 16e2d4dba..0523cc958 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -23,6 +23,11 @@ service ValidatorService { rpc ValidatorIndex(PublicKey) returns (IndexResponse); } +service AttestationService { + rpc ProduceAttestationData (ProduceAttestationDataRequest) returns (ProduceAttestationDataResponse); + rpc PublishAttestationData (PublishAttestationDataRequest) returns (PublishAttestationDataResponse); +} + message BeaconBlock { uint64 slot = 1; bytes block_root = 2; @@ -30,6 +35,30 @@ message BeaconBlock { bytes signature = 4; } +message Crosslink { + uint64 epoch = 1; + bytes crosslink_data_root = 2; + +} + +message AttestationData { + uint64 slot = 1; + uint64 shard = 2; + bytes beacon_block_root = 3; + bytes epoch_boundary_root = 4; + bytes crosslink_data_root = 5; + Crosslink latest_crosslink = 6; + uint64 justified_epoch = 7; + bytes justified_block_root = 8; + +} + +message FreeAttestation { + AttestationData attestation_data = 1; + bytes signature = 2; + uint64 validator_index = 3; +} + // Validator requests an unsigned proposal. message ProduceBeaconBlockRequest { uint64 slot = 1; @@ -51,6 +80,24 @@ message PublishBeaconBlockResponse { bytes msg = 2; } +message ProduceAttestationDataRequest { + uint64 slot = 1; + uint64 shard = 2; +} + +message ProduceAttestationDataResponse { + AttestationData attestation_data = 1; +} + +message PublishAttestationDataRequest { + FreeAttestation free_attestation = 1; +} + +message PublishAttestationDataResponse { + bool success = 1; + bytes msg = 2; +} + // A validators duties for some epoch. // TODO: add shard duties. message ValidatorAssignment { diff --git a/validator_client/src/attester_service/attestation_grpc_client.rs b/validator_client/src/attester_service/attestation_grpc_client.rs new file mode 100644 index 000000000..b3a0bd134 --- /dev/null +++ b/validator_client/src/attester_service/attestation_grpc_client.rs @@ -0,0 +1,32 @@ +use protos::services_grpc::AttestationServiceClient; +use std::sync::Arc; + +use attester::{BeaconNode, BeaconNodeError, PublishOutcome}; +use types::{AttestationData, FreeAttestation, Slot}; + +pub struct AttestationGrpcClient { + client: Arc, +} + +impl AttestationGrpcClient { + pub fn new(client: Arc) -> Self { + Self { client } + } +} + +impl BeaconNode for AttestationGrpcClient { + fn produce_attestation_data( + &self, + slot: Slot, + shard: u64, + ) -> Result, BeaconNodeError> { + Err(BeaconNodeError::DecodeFailure) + } + + fn publish_attestation_data( + &self, + free_attestation: FreeAttestation, + ) -> Result { + Err(BeaconNodeError::DecodeFailure) + } +} diff --git a/validator_client/src/attester_service/mod.rs b/validator_client/src/attester_service/mod.rs index fbf8bd005..fe5de7647 100644 --- a/validator_client/src/attester_service/mod.rs +++ b/validator_client/src/attester_service/mod.rs @@ -1,8 +1,11 @@ +mod attestation_grpc_client; use attester::{Attester, BeaconNode, DutiesReader, PollOutcome as AttesterPollOutcome, Signer}; -use slog::Logger; +use slog::{error, info, warn, Logger}; use slot_clock::SlotClock; use std::time::Duration; +pub use self::attestation_grpc_client::AttestationGrpcClient; + pub struct AttesterService { pub attester: Attester, pub poll_interval_millis: u64, diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 2e432ceff..60bc76553 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,13 +1,15 @@ use self::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; -use crate::attester_service::AttesterService; +use crate::attester_service::{AttestationGrpcClient, AttesterService}; use crate::config::ClientConfig; use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; use bls::Keypair; use clap::{App, Arg}; use grpcio::{ChannelBuilder, EnvBuilder}; -use protos::services_grpc::{BeaconBlockServiceClient, ValidatorServiceClient}; +use protos::services_grpc::{ + AttestationServiceClient, BeaconBlockServiceClient, ValidatorServiceClient, +}; use slog::{error, info, o, Drain}; use slot_clock::SystemTimeSlotClock; use std::path::PathBuf; @@ -106,6 +108,13 @@ fn main() { Arc::new(ValidatorServiceClient::new(ch)) }; + //Beacon node gRPC attester endpoints. + let attester_grpc_client = { + let env = Arc::new(EnvBuilder::new().build()); + let ch = ChannelBuilder::new(env).connect(&config.server); + Arc::new(AttestationServiceClient::new(ch)) + }; + // Spec let spec = Arc::new(config.spec.clone()); @@ -182,15 +191,13 @@ fn main() { }) }; - //Spawn a new thread for attestation for the validator. + // Spawn a new thread for attestation for the validator. let attester_thread = { let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); let duties_map = duties_map.clone(); let slot_clock = slot_clock.clone(); let log = log.clone(); - //TODO: this is wrong, I assume this has to be AttesterGrpcClient, which has to be defined analogous - // to beacon_block_grpc_client.rs - let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); + let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); thread::spawn(move || { let attester = Attester::new(duties_map, slot_clock, client, signer); let mut attester_service = AttesterService { @@ -199,7 +206,7 @@ fn main() { log, }; - block_producer_service.run(); + attester_service.run(); }) }; From d8099ae00c735058a88ea8205bfede7ee8c50683 Mon Sep 17 00:00:00 2001 From: thojest Date: Mon, 18 Mar 2019 21:12:06 +0100 Subject: [PATCH 3/3] started implementing BeaconNode for AttestationGrpcClient; included correct epoch_map for instantiation of Attester (lighthouse-255) --- .../attester_service/attestation_grpc_client.rs | 12 ++++++++++++ validator_client/src/main.rs | 14 ++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/validator_client/src/attester_service/attestation_grpc_client.rs b/validator_client/src/attester_service/attestation_grpc_client.rs index b3a0bd134..566d74a39 100644 --- a/validator_client/src/attester_service/attestation_grpc_client.rs +++ b/validator_client/src/attester_service/attestation_grpc_client.rs @@ -2,6 +2,7 @@ use protos::services_grpc::AttestationServiceClient; use std::sync::Arc; use attester::{BeaconNode, BeaconNodeError, PublishOutcome}; +use protos::services::ProduceAttestationDataRequest; use types::{AttestationData, FreeAttestation, Slot}; pub struct AttestationGrpcClient { @@ -20,6 +21,16 @@ impl BeaconNode for AttestationGrpcClient { slot: Slot, shard: u64, ) -> Result, BeaconNodeError> { + let mut req = ProduceAttestationDataRequest::new(); + req.set_slot(slot.as_u64()); + req.set_shard(shard); + + let reply = self + .client + .produce_attestation_data(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + // TODO: return correct AttestationData Err(BeaconNodeError::DecodeFailure) } @@ -27,6 +38,7 @@ impl BeaconNode for AttestationGrpcClient { &self, free_attestation: FreeAttestation, ) -> Result { + // TODO: return correct PublishOutcome Err(BeaconNodeError::DecodeFailure) } } diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 60bc76553..4664d5dc9 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -2,6 +2,7 @@ use self::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; use crate::attester_service::{AttestationGrpcClient, AttesterService}; use crate::config::ClientConfig; +use attester::test_utils::EpochMap; use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; use bls::Keypair; @@ -19,7 +20,6 @@ use types::ChainSpec; mod attester_service; mod block_producer_service; - mod config; mod duties; @@ -143,6 +143,7 @@ fn main() { for keypair in keypairs { info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id()); let duties_map = Arc::new(EpochDutiesMap::new(spec.slots_per_epoch)); + let epoch_map_for_attester = Arc::new(EpochMap::new(spec.slots_per_epoch)); // Spawn a new thread to maintain the validator's `EpochDuties`. let duties_manager_thread = { @@ -191,15 +192,15 @@ fn main() { }) }; - // Spawn a new thread for attestation for the validator. + // Spawn a new thread for attestation for the validator. let attester_thread = { let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); - let duties_map = duties_map.clone(); + let epoch_map = epoch_map_for_attester.clone(); let slot_clock = slot_clock.clone(); let log = log.clone(); let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); thread::spawn(move || { - let attester = Attester::new(duties_map, slot_clock, client, signer); + let attester = Attester::new(epoch_map, slot_clock, client, signer); let mut attester_service = AttesterService { attester, poll_interval_millis, @@ -210,13 +211,14 @@ fn main() { }) }; - threads.push((duties_manager_thread, producer_thread)); + threads.push((duties_manager_thread, producer_thread, attester_thread)); } // Naively wait for all the threads to complete. for tuple in threads { - let (manager, producer) = tuple; + let (manager, producer, attester) = tuple; let _ = producer.join(); let _ = manager.join(); + let _ = attester.join(); } }