Merge pull request #154 from sigp/grpc-rs

Add validator_node, restructure binaries, gRPC.
This commit is contained in:
Paul Hauner 2019-01-22 15:55:57 +11:00 committed by GitHub
commit 87c73b1af9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 1390 additions and 78 deletions

View File

@ -1,8 +1,15 @@
language: rust
before_install:
- curl -OL https://github.com/google/protobuf/releases/download/v3.4.0/protoc-3.4.0-linux-x86_64.zip
- unzip protoc-3.4.0-linux-x86_64.zip -d protoc3
- sudo mv protoc3/bin/* /usr/local/bin/
- sudo mv protoc3/include/* /usr/local/include/
- sudo chown $USER /usr/local/bin/protoc
- sudo chown -R $USER /usr/local/include/google
script:
- cargo fmt --all -- --check
- cargo build --verbose --all
- cargo test --verbose --all
- cargo fmt --all -- --check
rust:
- stable
- beta

View File

@ -1,36 +1,3 @@
[package]
name = "lighthouse"
version = "0.0.1"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dependencies]
blake2-rfc = "0.2.18"
bls-aggregates = { git = "https://github.com/sigp/signature-schemes" }
bytes = ""
crypto-mac = "^0.6.2"
clap = "2.32.0"
db = { path = "lighthouse/db" }
dirs = "1.0.3"
futures = "0.1.23"
rand = "0.3"
rlp = { git = "https://github.com/paritytech/parity-common" }
slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
tokio = "0.1"
[dependencies.pairing]
git = "https://github.com/mmaker/pairing"
branch = "feature/hashing"
[patch.crates-io]
ring = { git = "https://github.com/paritytech/ring" }
[[bin]]
path = "lighthouse/main.rs"
name = "lighthouse"
[workspace]
members = [
"beacon_chain/attestation_validation",
@ -47,6 +14,8 @@ members = [
"beacon_chain/utils/vec_shuffle",
"beacon_chain/validator_induction",
"beacon_chain/validator_shuffling",
"lighthouse/beacon_chain",
"lighthouse/db",
"beacon_node",
"beacon_node/db",
"protos",
"validator_client",
]

View File

@ -113,6 +113,10 @@ A few basic steps are needed to get set up:
3. Use the command `rustup show` to get information about the Rust installation. You should see that the active toolchain is the stable version.
4. Run `rustc --version` to check the installation and version of rust.
- Updates can be performed using` rustup update` .
5. Install build dependancies (Arch packages are listed here, your distribution will
likely be similar):
- `clang`: required by RocksDB.
- `protobuf`: required for protobuf serialization (gRPC).
5. Navigate to the working directory.
6. Run the test by using command `cargo test --all` . By running, it will pass all the required test cases. If you are doing it for the first time, then you can grab a coffee meantime. Usually, it takes time to build, compile and pass all test cases. If there is no error then, it means everything is working properly and it's time to get hand's dirty. In case, if there is an error, then please raise the [issue](https://github.com/sigp/lighthouse/issues). We will help you.
7. As an alternative to, or instead of the above step, you may also run benchmarks by using the command `cargo bench --all`

View File

@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
bls = { path = "../utils/bls" }
db = { path = "../../lighthouse/db" }
db = { path = "../../beacon_node/db" }
hashing = { path = "../utils/hashing" }
ssz = { path = "../utils/ssz" }
types = { path = "../types" }

View File

@ -5,6 +5,6 @@ authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dependencies]
db = { path = "../../lighthouse/db" }
db = { path = "../../beacon_node/db" }
ssz = { path = "../utils/ssz" }
types = { path = "../types" }

View File

@ -64,3 +64,5 @@ pub type AttesterMap = HashMap<(u64, u64), Vec<usize>>;
/// Maps a slot to a block proposer.
pub type ProposerMap = HashMap<u64, usize>;
pub use bls::{AggregatePublicKey, AggregateSignature, PublicKey, Signature};

View File

@ -7,4 +7,5 @@ edition = "2018"
[dependencies]
bls-aggregates = { git = "https://github.com/sigp/signature-schemes" }
hashing = { path = "../hashing" }
hex = "0.3"
ssz = { path = "../ssz" }

View File

@ -6,7 +6,7 @@ use bls_aggregates::AggregateSignature as RawAggregateSignature;
///
/// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ
/// serialization).
#[derive(Debug, PartialEq, Clone, Default)]
#[derive(Debug, PartialEq, Clone, Default, Eq)]
pub struct AggregateSignature(RawAggregateSignature);
impl AggregateSignature {

View File

@ -1,6 +1,6 @@
use super::{PublicKey, SecretKey};
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Keypair {
pub sk: SecretKey,
pub pk: PublicKey,

View File

@ -1,13 +1,15 @@
use super::SecretKey;
use bls_aggregates::PublicKey as RawPublicKey;
use ssz::{decode_ssz_list, Decodable, DecodeError, Encodable, SszStream};
use hex::encode as hex_encode;
use ssz::{decode_ssz_list, ssz_encode, Decodable, DecodeError, Encodable, SszStream};
use std::default;
use std::hash::{Hash, Hasher};
/// A single BLS signature.
///
/// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ
/// serialization).
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, Clone, Eq)]
pub struct PublicKey(RawPublicKey);
impl PublicKey {
@ -19,6 +21,15 @@ impl PublicKey {
pub fn as_raw(&self) -> &RawPublicKey {
&self.0
}
/// Returns the last 6 bytes of the SSZ encoding of the public key, as a hex string.
///
/// Useful for providing a short identifier to the user.
pub fn concatenated_hex_id(&self) -> String {
let bytes = ssz_encode(self);
let end_bytes = &bytes[bytes.len().saturating_sub(6)..bytes.len()];
hex_encode(end_bytes)
}
}
impl default::Default for PublicKey {
@ -42,6 +53,18 @@ impl Decodable for PublicKey {
}
}
impl PartialEq for PublicKey {
fn eq(&self, other: &PublicKey) -> bool {
ssz_encode(self) == ssz_encode(other)
}
}
impl Hash for PublicKey {
fn hash<H: Hasher>(&self, state: &mut H) {
ssz_encode(self).hash(state)
}
}
#[cfg(test)]
mod tests {
use super::super::ssz::ssz_encode;

View File

@ -5,7 +5,7 @@ use ssz::{decode_ssz_list, Decodable, DecodeError, Encodable, SszStream};
///
/// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ
/// serialization).
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq, Clone, Eq)]
pub struct SecretKey(RawSecretKey);
impl SecretKey {

View File

@ -6,7 +6,7 @@ use bls_aggregates::Signature as RawSignature;
///
/// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ
/// serialization).
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq, Clone, Eq)]
pub struct Signature(RawSignature);
impl Signature {

View File

@ -4,7 +4,7 @@ mod testing_slot_clock;
pub use crate::system_time_slot_clock::{Error as SystemTimeSlotClockError, SystemTimeSlotClock};
pub use crate::testing_slot_clock::{Error as TestingSlotClockError, TestingSlotClock};
pub trait SlotClock {
pub trait SlotClock: Send + Sync {
type Error;
fn present_slot(&self) -> Result<Option<u64>, Self::Error>;

20
beacon_node/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "beacon_node"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dependencies]
bls = { path = "../beacon_chain/utils/bls" }
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0.2"
protos = { path = "../protos" }
clap = "2.32.0"
db = { path = "db" }
dirs = "1.0.3"
futures = "0.1.23"
slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
ssz = { path = "../beacon_chain/utils/ssz" }
tokio = "0.1"

View File

@ -1,5 +1,3 @@
extern crate dirs;
use std::fs;
use std::path::PathBuf;

View File

@ -1,20 +1,14 @@
#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_term;
// extern crate ssz;
extern crate clap;
extern crate futures;
extern crate db;
mod config;
mod rpc;
use std::path::PathBuf;
use crate::config::LighthouseConfig;
use crate::rpc::start_server;
use clap::{App, Arg};
use slog::Drain;
use slog::{error, info, o, Drain};
fn main() {
let decorator = slog_term::TermDecorator::new().build();
@ -64,10 +58,9 @@ fn main() {
"data_dir" => &config.data_dir.to_str(),
"port" => &config.p2p_listen_port);
error!(
log,
"Lighthouse under development and does not provide a user demo."
);
let _server = start_server(log.clone());
info!(log, "Exiting.");
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
}
}

View File

@ -0,0 +1,57 @@
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use protos::services::{
BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse,
PublishBeaconBlockRequest, PublishBeaconBlockResponse,
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
#[derive(Clone)]
pub struct BeaconBlockServiceInstance {
pub log: Logger,
}
impl BeaconBlockService for BeaconBlockServiceInstance {
/// Produce a `BeaconBlock` for signing by a validator.
fn produce_beacon_block(
&mut self,
ctx: RpcContext,
req: ProduceBeaconBlockRequest,
sink: UnarySink<ProduceBeaconBlockResponse>,
) {
println!("producing at slot {}", req.get_slot());
// TODO: build a legit block.
let mut block = BeaconBlockProto::new();
block.set_slot(req.get_slot());
block.set_block_root("cats".as_bytes().to_vec());
let mut resp = ProduceBeaconBlockResponse::new();
resp.set_block(block);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `BeaconBlock`, process and publish it.
fn publish_beacon_block(
&mut self,
ctx: RpcContext,
req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>,
) {
println!("publishing {:?}", req.get_block());
// TODO: actually process the block.
let mut resp = PublishBeaconBlockResponse::new();
resp.set_success(true);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@ -0,0 +1,36 @@
mod beacon_block;
mod validator;
use self::beacon_block::BeaconBlockServiceInstance;
use self::validator::ValidatorServiceInstance;
use grpcio::{Environment, Server, ServerBuilder};
use protos::services_grpc::{create_beacon_block_service, create_validator_service};
use std::sync::Arc;
use slog::{info, Logger};
pub fn start_server(log: Logger) -> Server {
let log_clone = log.clone();
let env = Arc::new(Environment::new(1));
let beacon_block_service = {
let instance = BeaconBlockServiceInstance { log: log.clone() };
create_beacon_block_service(instance)
};
let validator_service = {
let instance = ValidatorServiceInstance { log: log.clone() };
create_validator_service(instance)
};
let mut server = ServerBuilder::new(env)
.register_service(beacon_block_service)
.register_service(validator_service)
.bind("127.0.0.1", 50_051)
.build()
.unwrap();
server.start();
for &(ref host, port) in server.bind_addrs() {
info!(log_clone, "gRPC listening on {}:{}", host, port);
}
server
}

View File

@ -0,0 +1,64 @@
use bls::PublicKey;
use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use protos::services::{
IndexResponse, ProposeBlockSlotRequest, ProposeBlockSlotResponse, PublicKey as PublicKeyRequest,
};
use protos::services_grpc::ValidatorService;
use slog::{debug, Logger};
use ssz::Decodable;
#[derive(Clone)]
pub struct ValidatorServiceInstance {
pub log: Logger,
}
impl ValidatorService for ValidatorServiceInstance {
fn validator_index(
&mut self,
ctx: RpcContext,
req: PublicKeyRequest,
sink: UnarySink<IndexResponse>,
) {
if let Ok((public_key, _)) = PublicKey::ssz_decode(req.get_public_key(), 0) {
debug!(self.log, "RPC request"; "endpoint" => "ValidatorIndex", "public_key" => public_key.concatenated_hex_id());
let mut resp = IndexResponse::new();
// TODO: return a legit value.
resp.set_index(1);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
} else {
let f = sink
.fail(RpcStatus::new(
RpcStatusCode::InvalidArgument,
Some("Invalid public_key".to_string()),
))
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}
fn propose_block_slot(
&mut self,
ctx: RpcContext,
req: ProposeBlockSlotRequest,
sink: UnarySink<ProposeBlockSlotResponse>,
) {
debug!(self.log, "RPC request"; "endpoint" => "ProposeBlockSlot", "epoch" => req.get_epoch(), "validator_index" => req.get_validator_index());
let mut resp = ProposeBlockSlotResponse::new();
// TODO: return a legit value.
resp.set_slot(1);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@ -1,17 +0,0 @@
[package]
name = "chain"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dependencies]
bls = { path = "../../beacon_chain/utils/bls" }
db = { path = "../db" }
genesis = { path = "../../beacon_chain/genesis" }
naive_fork_choice = { path = "../../beacon_chain/naive_fork_choice" }
slot_clock = { path = "../../beacon_chain/utils/slot_clock" }
spec = { path = "../../beacon_chain/spec" }
ssz = { path = "../../beacon_chain/utils/ssz" }
types = { path = "../../beacon_chain/types" }
validator_induction = { path = "../../beacon_chain/validator_induction" }
validator_shuffling = { path = "../../beacon_chain/validator_shuffling" }

2
protos/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
src/services.rs
src/services_grpc.rs

14
protos/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "protos"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
description = "Google protobuf message and service definitions used in Lighthouse APIs."
[dependencies]
futures = "0.1"
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0"
[build-dependencies]
protoc-grpcio = "0.3.1"

8
protos/build.rs Normal file
View File

@ -0,0 +1,8 @@
extern crate protoc_grpcio;
fn main() {
let proto_root = "src/";
println!("cargo:rerun-if-changed={}", proto_root);
protoc_grpcio::compile_grpc_protos(&["services.proto"], &[proto_root], &proto_root)
.expect("Failed to compile gRPC definitions!");
}

2
protos/src/lib.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod services;
pub mod services_grpc;

94
protos/src/services.proto Normal file
View File

@ -0,0 +1,94 @@
// TODO: This setup requires that the BN (beacon node) holds the block in state
// during the interval between the `GenerateProposalRequest` and the
// `SubmitProposalRequest`.
//
// This is sub-optimal as if a validator client switches BN during this process
// the block will be lost.
//
// This "stateful" method is being used presently because it's easier and
// requires less maintainence as the `BeaconBlock` definition changes.
syntax = "proto3";
package ethereum.beacon.rpc.v1;
service BeaconBlockService {
rpc ProduceBeaconBlock(ProduceBeaconBlockRequest) returns (ProduceBeaconBlockResponse);
rpc PublishBeaconBlock(PublishBeaconBlockRequest) returns (PublishBeaconBlockResponse);
}
service ValidatorService {
// rpc ValidatorAssignment(ValidatorAssignmentRequest) returns (ValidatorAssignmentResponse);
rpc ProposeBlockSlot(ProposeBlockSlotRequest) returns (ProposeBlockSlotResponse);
rpc ValidatorIndex(PublicKey) returns (IndexResponse);
}
message BeaconBlock {
uint64 slot = 1;
bytes block_root = 2;
bytes randao_reveal = 3;
bytes signature = 4;
}
// Validator requests an unsigned proposal.
message ProduceBeaconBlockRequest {
uint64 slot = 1;
}
// Beacon node returns an unsigned proposal.
message ProduceBeaconBlockResponse {
BeaconBlock block = 1;
}
// Validator submits a signed proposal.
message PublishBeaconBlockRequest {
BeaconBlock block = 1;
}
// Beacon node indicates a sucessfully submitted proposal.
message PublishBeaconBlockResponse {
bool success = 1;
bytes msg = 2;
}
// A validators duties for some epoch.
// TODO: add shard duties.
message ValidatorAssignment {
oneof block_production_slot_oneof {
bool block_production_slot_none = 1;
uint64 block_production_slot = 2;
}
}
message ValidatorAssignmentRequest {
uint64 epoch = 1;
bytes validator_index = 2;
}
/*
* Propose slot
*/
message ProposeBlockSlotRequest {
uint64 epoch = 1;
uint64 validator_index = 2;
}
message ProposeBlockSlotResponse {
oneof slot_oneof {
bool none = 1;
uint64 slot = 2;
}
}
/*
* Validator Assignment
*/
message PublicKey {
bytes public_key = 1;
}
message IndexResponse {
uint64 index = 1;
}

View File

@ -0,0 +1,20 @@
[package]
name = "validator_client"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dependencies]
bls = { path = "../beacon_chain/utils/bls" }
clap = "2.32.0"
dirs = "1.0.3"
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0.2"
protos = { path = "../protos" }
slot_clock = { path = "../beacon_chain/utils/slot_clock" }
spec = { path = "../beacon_chain/spec" }
types = { path = "../beacon_chain/types" }
slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
ssz = { path = "../beacon_chain/utils/ssz" }

View File

@ -0,0 +1,67 @@
# Lighthouse Validator Client
The Validator Client (VC) is a stand-alone binary which connects to a Beacon
Node (BN) and fulfils the roles of a validator.
## Roles
The VC is responsible for the following tasks:
- Requesting validator duties (a.k.a. shuffling) from the BN.
- Prompting the BN to produce a new block, when a validators block production
duties require.
- Completing all the fields on a new block (e.g., RANDAO reveal, signature) and
publishing the block to a BN.
- Prompting the BN to produce a new shard atteststation as per a validators
duties.
- Ensuring that no slashable messages are signed by a validator private key.
- Keeping track of the system clock and how it relates to slots/epochs.
The VC is capable of managing multiple validators in the same process tree.
## Implementation
_This section describes the present implementation of this VC binary._
### Services
Each validator is represented by two services, one which tracks the validator
duties and another which performs block production duties.
A separate thread is maintained for each service, for each validator. As such,
a single validator utilises three (3) threads (one for the base VC and two for
each service) and two validators utilise five (5) threads.
#### `DutiesManagerService`
Polls a BN and requests validator responsibilities, as well as a validator
index. The outcome of a successful poll is a `EpochDuties` struct:
```rust
EpochDuties {
validator_index: u64,
block_prodcution_slot: u64,
}
```
This is stored in the `EpochDutiesMap`, a `HashMap` mapping `epoch ->
EpochDuties`.
#### `BlockProducerService`
Polls the system clock and determines if a block needs to be produced. Reads
from the `EpochDutiesMap` maintained by the `DutiesManagerService`.
If block production is required, performs all the necessary duties to request,
complete and return a block from the BN.
### Configuration
Presently the validator specifics (pubkey, etc.) are randomly generated and the
chain specification (slot length, BLS domain, etc.) are fixed to foundation
parameters. This is temporary and will be upgrade so these parameters can be
read from file (or initialized on first-boot).
## BN Communication
The VC communicates with the BN via a gRPC/protobuf connection.

View File

@ -0,0 +1,74 @@
use super::traits::{BeaconNode, BeaconNodeError};
use protos::services::{
BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest,
};
use protos::services_grpc::BeaconBlockServiceClient;
use ssz::{ssz_encode, Decodable};
use types::{BeaconBlock, BeaconBlockBody, Hash256, Signature};
impl BeaconNode for BeaconBlockServiceClient {
/// Request a Beacon Node (BN) to produce a new block at the supplied slot.
///
/// Returns `None` if it is not possible to produce at the supplied slot. For example, if the
/// BN is unable to find a parent block.
fn produce_beacon_block(&self, slot: u64) -> Result<Option<BeaconBlock>, BeaconNodeError> {
let mut req = ProduceBeaconBlockRequest::new();
req.set_slot(slot);
let reply = self
.produce_beacon_block(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
if reply.has_block() {
let block = reply.get_block();
let (signature, _) = Signature::ssz_decode(block.get_signature(), 0)
.map_err(|_| BeaconNodeError::DecodeFailure)?;
// TODO: this conversion is incomplete; fix it.
Ok(Some(BeaconBlock {
slot: block.get_slot(),
parent_root: Hash256::zero(),
state_root: Hash256::zero(),
randao_reveal: Hash256::from(block.get_randao_reveal()),
candidate_pow_receipt_root: Hash256::zero(),
signature,
body: BeaconBlockBody {
proposer_slashings: vec![],
casper_slashings: vec![],
attestations: vec![],
custody_reseeds: vec![],
custody_challenges: vec![],
custody_responses: vec![],
deposits: vec![],
exits: vec![],
},
}))
} else {
Ok(None)
}
}
/// Request a Beacon Node (BN) to publish a block.
///
/// Generally, this will be called after a `produce_beacon_block` call with a block that has
/// been completed (signed) by the validator client.
fn publish_beacon_block(&self, block: BeaconBlock) -> Result<bool, BeaconNodeError> {
let mut req = PublishBeaconBlockRequest::new();
// TODO: this conversion is incomplete; fix it.
let mut grpc_block = GrpcBeaconBlock::new();
grpc_block.set_slot(block.slot);
grpc_block.set_block_root(vec![0]);
grpc_block.set_randao_reveal(block.randao_reveal.to_vec());
grpc_block.set_signature(ssz_encode(&block.signature));
req.set_block(grpc_block);
let reply = self
.publish_beacon_block(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
Ok(reply.get_success())
}
}

View File

@ -0,0 +1,254 @@
mod grpc;
mod service;
#[cfg(test)]
mod test_node;
mod traits;
use self::traits::{BeaconNode, BeaconNodeError};
use super::EpochDutiesMap;
use slot_clock::SlotClock;
use spec::ChainSpec;
use std::sync::{Arc, RwLock};
use types::BeaconBlock;
pub use self::service::BlockProducerService;
#[derive(Debug, PartialEq)]
pub enum PollOutcome {
/// A new block was produced.
BlockProduced(u64),
/// A block was not produced as it would have been slashable.
SlashableBlockNotProduced(u64),
/// The validator duties did not require a block to be produced.
BlockProductionNotRequired(u64),
/// The duties for the present epoch were not found.
ProducerDutiesUnknown(u64),
/// The slot has already been processed, execution was skipped.
SlotAlreadyProcessed(u64),
/// The Beacon Node was unable to produce a block at that slot.
BeaconNodeUnableToProduceBlock(u64),
}
#[derive(Debug, PartialEq)]
pub enum Error {
SlotClockError,
SlotUnknowable,
EpochMapPoisoned,
SlotClockPoisoned,
EpochLengthIsZero,
BeaconNodeError(BeaconNodeError),
}
/// A polling state machine which performs block production duties, based upon some epoch duties
/// (`EpochDutiesMap`) and a concept of time (`SlotClock`).
///
/// Ensures that messages are not slashable.
///
/// Relies upon an external service to keep the `EpochDutiesMap` updated.
pub struct BlockProducer<T: SlotClock, U: BeaconNode> {
pub last_processed_slot: u64,
spec: Arc<ChainSpec>,
epoch_map: Arc<RwLock<EpochDutiesMap>>,
slot_clock: Arc<RwLock<T>>,
beacon_node: Arc<U>,
}
impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> {
/// Returns a new instance where `last_processed_slot == 0`.
pub fn new(
spec: Arc<ChainSpec>,
epoch_map: Arc<RwLock<EpochDutiesMap>>,
slot_clock: Arc<RwLock<T>>,
beacon_node: Arc<U>,
) -> Self {
Self {
last_processed_slot: 0,
spec,
epoch_map,
slot_clock,
beacon_node,
}
}
}
impl<T: SlotClock, U: BeaconNode> BlockProducer<T, U> {
/// "Poll" to see if the validator is required to take any action.
///
/// The slot clock will be read and any new actions undertaken.
pub fn poll(&mut self) -> Result<PollOutcome, Error> {
let slot = self
.slot_clock
.read()
.map_err(|_| Error::SlotClockPoisoned)?
.present_slot()
.map_err(|_| Error::SlotClockError)?
.ok_or(Error::SlotUnknowable)?;
let epoch = slot
.checked_div(self.spec.epoch_length)
.ok_or(Error::EpochLengthIsZero)?;
// If this is a new slot.
if slot > self.last_processed_slot {
let is_block_production_slot = {
let epoch_map = self.epoch_map.read().map_err(|_| Error::EpochMapPoisoned)?;
match epoch_map.get(&epoch) {
None => return Ok(PollOutcome::ProducerDutiesUnknown(slot)),
Some(duties) => duties.is_block_production_slot(slot),
}
};
if is_block_production_slot {
self.last_processed_slot = slot;
self.produce_block(slot)
} else {
Ok(PollOutcome::BlockProductionNotRequired(slot))
}
} else {
Ok(PollOutcome::SlotAlreadyProcessed(slot))
}
}
/// Produce a block at some slot.
///
/// Assumes that a block is required at this slot (does not check the duties).
///
/// Ensures the message is not slashable.
///
/// !!! UNSAFE !!!
///
/// The slash-protection code is not yet implemented. There is zero protection against
/// slashing.
fn produce_block(&mut self, slot: u64) -> Result<PollOutcome, Error> {
if let Some(block) = self.beacon_node.produce_beacon_block(slot)? {
if self.safe_to_produce(&block) {
let block = self.sign_block(block);
self.beacon_node.publish_beacon_block(block)?;
Ok(PollOutcome::BlockProduced(slot))
} else {
Ok(PollOutcome::SlashableBlockNotProduced(slot))
}
} else {
Ok(PollOutcome::BeaconNodeUnableToProduceBlock(slot))
}
}
/// Consumes a block, returning that block signed by the validators private key.
///
/// Important: this function will not check to ensure the block is not slashable. This must be
/// done upstream.
fn sign_block(&mut self, block: BeaconBlock) -> BeaconBlock {
// TODO: sign the block
// https://github.com/sigp/lighthouse/issues/160
self.store_produce(&block);
block
}
/// Returns `true` if signing a block is safe (non-slashable).
///
/// !!! UNSAFE !!!
///
/// Important: this function is presently stubbed-out. It provides ZERO SAFETY.
fn safe_to_produce(&self, _block: &BeaconBlock) -> bool {
// TODO: ensure the producer doesn't produce slashable blocks.
// https://github.com/sigp/lighthouse/issues/160
true
}
/// Record that a block was produced so that slashable votes may not be made in the future.
///
/// !!! UNSAFE !!!
///
/// Important: this function is presently stubbed-out. It provides ZERO SAFETY.
fn store_produce(&mut self, _block: &BeaconBlock) {
// TODO: record this block production to prevent future slashings.
// https://github.com/sigp/lighthouse/issues/160
}
}
impl From<BeaconNodeError> for Error {
fn from(e: BeaconNodeError) -> Error {
Error::BeaconNodeError(e)
}
}
#[cfg(test)]
mod tests {
use super::test_node::TestBeaconNode;
use super::*;
use crate::duties::EpochDuties;
use slot_clock::TestingSlotClock;
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
// TODO: implement more thorough testing.
// https://github.com/sigp/lighthouse/issues/160
//
// These tests should serve as a good example for future tests.
#[test]
pub fn polling() {
let mut rng = XorShiftRng::from_seed([42; 16]);
let spec = Arc::new(ChainSpec::foundation());
let epoch_map = Arc::new(RwLock::new(EpochDutiesMap::new()));
let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0)));
let beacon_node = Arc::new(TestBeaconNode::default());
let mut block_producer = BlockProducer::new(
spec.clone(),
epoch_map.clone(),
slot_clock.clone(),
beacon_node.clone(),
);
// Configure responses from the BeaconNode.
beacon_node.set_next_produce_result(Ok(Some(BeaconBlock::random_for_test(&mut rng))));
beacon_node.set_next_publish_result(Ok(true));
// Setup some valid duties for the validator
let produce_slot = 100;
let duties = EpochDuties {
block_production_slot: Some(produce_slot),
..std::default::Default::default()
};
let produce_epoch = produce_slot / spec.epoch_length;
epoch_map.write().unwrap().insert(produce_epoch, duties);
// One slot before production slot...
slot_clock.write().unwrap().set_slot(produce_slot - 1);
assert_eq!(
block_producer.poll(),
Ok(PollOutcome::BlockProductionNotRequired(produce_slot - 1))
);
// On the produce slot...
slot_clock.write().unwrap().set_slot(produce_slot);
assert_eq!(
block_producer.poll(),
Ok(PollOutcome::BlockProduced(produce_slot))
);
// Trying the same produce slot again...
slot_clock.write().unwrap().set_slot(produce_slot);
assert_eq!(
block_producer.poll(),
Ok(PollOutcome::SlotAlreadyProcessed(produce_slot))
);
// One slot after the produce slot...
slot_clock.write().unwrap().set_slot(produce_slot + 1);
assert_eq!(
block_producer.poll(),
Ok(PollOutcome::BlockProductionNotRequired(produce_slot + 1))
);
// In an epoch without known duties...
let slot = (produce_epoch + 1) * spec.epoch_length;
slot_clock.write().unwrap().set_slot(slot);
assert_eq!(
block_producer.poll(),
Ok(PollOutcome::ProducerDutiesUnknown(slot))
);
}
}

View File

@ -0,0 +1,45 @@
use super::traits::BeaconNode;
use super::{BlockProducer, PollOutcome as BlockProducerPollOutcome, SlotClock};
use slog::{error, info, warn, Logger};
use std::time::Duration;
pub struct BlockProducerService<T: SlotClock, U: BeaconNode> {
pub block_producer: BlockProducer<T, U>,
pub poll_interval_millis: u64,
pub log: Logger,
}
impl<T: SlotClock, U: BeaconNode> BlockProducerService<T, U> {
/// Run a loop which polls the block producer each `poll_interval_millis` millseconds.
///
/// Logs the results of the polls.
pub fn run(&mut self) {
loop {
match self.block_producer.poll() {
Err(error) => {
error!(self.log, "Block producer poll error"; "error" => format!("{:?}", error))
}
Ok(BlockProducerPollOutcome::BlockProduced(slot)) => {
info!(self.log, "Produced block"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::SlashableBlockNotProduced(slot)) => {
warn!(self.log, "Slashable block was not signed"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::BlockProductionNotRequired(slot)) => {
info!(self.log, "Block production not required"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::ProducerDutiesUnknown(slot)) => {
error!(self.log, "Block production duties unknown"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::SlotAlreadyProcessed(slot)) => {
warn!(self.log, "Attempted to re-process slot"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock(slot)) => {
error!(self.log, "Beacon node unable to produce block"; "slot" => slot)
}
};
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
}

View File

@ -0,0 +1,47 @@
use super::traits::{BeaconNode, BeaconNodeError};
use std::sync::RwLock;
use types::BeaconBlock;
type ProduceResult = Result<Option<BeaconBlock>, BeaconNodeError>;
type PublishResult = Result<bool, BeaconNodeError>;
/// A test-only struct used to simulate a Beacon Node.
#[derive(Default)]
pub struct TestBeaconNode {
pub produce_input: RwLock<Option<u64>>,
pub produce_result: RwLock<Option<ProduceResult>>,
pub publish_input: RwLock<Option<BeaconBlock>>,
pub publish_result: RwLock<Option<PublishResult>>,
}
impl TestBeaconNode {
/// Set the result to be returned when `produce_beacon_block` is called.
pub fn set_next_produce_result(&self, result: ProduceResult) {
*self.produce_result.write().unwrap() = Some(result);
}
/// Set the result to be returned when `publish_beacon_block` is called.
pub fn set_next_publish_result(&self, result: PublishResult) {
*self.publish_result.write().unwrap() = Some(result);
}
}
impl BeaconNode for TestBeaconNode {
/// Returns the value specified by the `set_next_produce_result`.
fn produce_beacon_block(&self, slot: u64) -> ProduceResult {
*self.produce_input.write().unwrap() = Some(slot);
match *self.produce_result.read().unwrap() {
Some(ref r) => r.clone(),
None => panic!("TestBeaconNode: produce_result == None"),
}
}
/// Returns the value specified by the `set_next_publish_result`.
fn publish_beacon_block(&self, block: BeaconBlock) -> PublishResult {
*self.publish_input.write().unwrap() = Some(block);
match *self.publish_result.read().unwrap() {
Some(ref r) => r.clone(),
None => panic!("TestBeaconNode: publish_result == None"),
}
}
}

View File

@ -0,0 +1,19 @@
use types::BeaconBlock;
#[derive(Debug, PartialEq, Clone)]
pub enum BeaconNodeError {
RemoteFailure(String),
DecodeFailure,
}
/// Defines the methods required to produce and publish blocks on a Beacon Node.
pub trait BeaconNode: Send + Sync {
/// Request that the node produces a block.
///
/// Returns Ok(None) if the Beacon Node is unable to produce at the given slot.
fn produce_beacon_block(&self, slot: u64) -> Result<Option<BeaconBlock>, BeaconNodeError>;
/// Request that the node publishes a block.
///
/// Returns `true` if the publish was sucessful.
fn publish_beacon_block(&self, block: BeaconBlock) -> Result<bool, BeaconNodeError>;
}

View File

@ -0,0 +1,25 @@
use std::fs;
use std::path::PathBuf;
/// Stores the core configuration for this validator instance.
#[derive(Clone)]
pub struct ClientConfig {
pub data_dir: PathBuf,
pub server: String,
}
const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse-validators";
impl ClientConfig {
/// Build a new configuration from defaults.
pub fn default() -> Self {
let data_dir = {
let home = dirs::home_dir().expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR)
};
fs::create_dir_all(&data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let server = "localhost:50051".to_string();
Self { data_dir, server }
}
}

View File

@ -0,0 +1,49 @@
use super::traits::{BeaconNode, BeaconNodeError};
use super::EpochDuties;
use protos::services::{ProposeBlockSlotRequest, PublicKey as IndexRequest};
use protos::services_grpc::ValidatorServiceClient;
use ssz::ssz_encode;
use types::PublicKey;
impl BeaconNode for ValidatorServiceClient {
/// Request the shuffling from the Beacon Node (BN).
///
/// As this function takes a `PublicKey`, it will first attempt to resolve the public key into
/// a validator index, then call the BN for production/attestation duties.
///
/// Note: presently only block production information is returned.
fn request_shuffling(
&self,
epoch: u64,
public_key: &PublicKey,
) -> Result<Option<EpochDuties>, BeaconNodeError> {
// Lookup the validator index for the supplied public key.
let validator_index = {
let mut req = IndexRequest::new();
req.set_public_key(ssz_encode(public_key).to_vec());
let resp = self
.validator_index(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
resp.get_index()
};
let mut req = ProposeBlockSlotRequest::new();
req.set_validator_index(validator_index);
req.set_epoch(epoch);
let reply = self
.propose_block_slot(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
let block_production_slot = if reply.has_slot() {
Some(reply.get_slot())
} else {
None
};
Ok(Some(EpochDuties {
validator_index,
block_production_slot,
}))
}
}

View File

@ -0,0 +1,179 @@
mod grpc;
mod service;
#[cfg(test)]
mod test_node;
mod traits;
use self::traits::{BeaconNode, BeaconNodeError};
use bls::PublicKey;
use slot_clock::SlotClock;
use spec::ChainSpec;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub use self::service::DutiesManagerService;
/// The information required for a validator to propose and attest during some epoch.
///
/// Generally obtained from a Beacon Node, this information contains the validators canonical index
/// (thier sequence in the global validator induction process) and the "shuffling" for that index
/// for some epoch.
#[derive(Debug, PartialEq, Clone, Copy, Default)]
pub struct EpochDuties {
pub validator_index: u64,
pub block_production_slot: Option<u64>,
// Future shard info
}
impl EpochDuties {
/// Returns `true` if the supplied `slot` is a slot in which the validator should produce a
/// block.
pub fn is_block_production_slot(&self, slot: u64) -> bool {
match self.block_production_slot {
Some(s) if s == slot => true,
_ => false,
}
}
}
/// Maps an `epoch` to some `EpochDuties` for a single validator.
pub type EpochDutiesMap = HashMap<u64, EpochDuties>;
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum PollOutcome {
/// The `EpochDuties` were not updated during this poll.
NoChange(u64),
/// The `EpochDuties` for the `epoch` were previously unknown, but obtained in the poll.
NewDuties(u64, EpochDuties),
/// New `EpochDuties` were obtained, different to those which were previously known. This is
/// likely to be the result of chain re-organisation.
DutiesChanged(u64, EpochDuties),
/// The Beacon Node was unable to return the duties as the validator is unknown, or the
/// shuffling for the epoch is unknown.
UnknownValidatorOrEpoch(u64),
}
#[derive(Debug, PartialEq)]
pub enum Error {
SlotClockError,
SlotUnknowable,
EpochMapPoisoned,
SlotClockPoisoned,
EpochLengthIsZero,
BeaconNodeError(BeaconNodeError),
}
/// A polling state machine which ensures the latest `EpochDuties` are obtained from the Beacon
/// Node.
///
/// There is a single `DutiesManager` per validator instance.
pub struct DutiesManager<T: SlotClock, U: BeaconNode> {
pub duties_map: Arc<RwLock<EpochDutiesMap>>,
/// The validator's public key.
pub pubkey: PublicKey,
pub spec: Arc<ChainSpec>,
pub slot_clock: Arc<RwLock<T>>,
pub beacon_node: Arc<U>,
}
impl<T: SlotClock, U: BeaconNode> DutiesManager<T, U> {
/// Poll the Beacon Node for `EpochDuties`.
///
/// The present `epoch` will be learned from the supplied `SlotClock`. In production this will
/// be a wall-clock (e.g., system time, remote server time, etc.).
pub fn poll(&self) -> Result<PollOutcome, Error> {
let slot = self
.slot_clock
.read()
.map_err(|_| Error::SlotClockPoisoned)?
.present_slot()
.map_err(|_| Error::SlotClockError)?
.ok_or(Error::SlotUnknowable)?;
let epoch = slot
.checked_div(self.spec.epoch_length)
.ok_or(Error::EpochLengthIsZero)?;
if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? {
let mut map = self
.duties_map
.write()
.map_err(|_| Error::EpochMapPoisoned)?;
// If these duties were known, check to see if they're updates or identical.
let result = if let Some(known_duties) = map.get(&epoch) {
if *known_duties == duties {
Ok(PollOutcome::NoChange(epoch))
} else {
Ok(PollOutcome::DutiesChanged(epoch, duties))
}
} else {
Ok(PollOutcome::NewDuties(epoch, duties))
};
map.insert(epoch, duties);
result
} else {
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch))
}
}
}
impl From<BeaconNodeError> for Error {
fn from(e: BeaconNodeError) -> Error {
Error::BeaconNodeError(e)
}
}
#[cfg(test)]
mod tests {
use super::test_node::TestBeaconNode;
use super::*;
use bls::Keypair;
use slot_clock::TestingSlotClock;
// TODO: implement more thorough testing.
// https://github.com/sigp/lighthouse/issues/160
//
// These tests should serve as a good example for future tests.
#[test]
pub fn polling() {
let spec = Arc::new(ChainSpec::foundation());
let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new()));
let keypair = Keypair::random();
let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0)));
let beacon_node = Arc::new(TestBeaconNode::default());
let manager = DutiesManager {
spec: spec.clone(),
pubkey: keypair.pk.clone(),
duties_map: duties_map.clone(),
slot_clock: slot_clock.clone(),
beacon_node: beacon_node.clone(),
};
// Configure response from the BeaconNode.
let duties = EpochDuties {
validator_index: 0,
block_production_slot: Some(10),
};
beacon_node.set_next_shuffling_result(Ok(Some(duties)));
// Get the duties for the first time...
assert_eq!(manager.poll(), Ok(PollOutcome::NewDuties(0, duties)));
// Get the same duties again...
assert_eq!(manager.poll(), Ok(PollOutcome::NoChange(0)));
// Return new duties.
let duties = EpochDuties {
validator_index: 0,
block_production_slot: Some(11),
};
beacon_node.set_next_shuffling_result(Ok(Some(duties)));
assert_eq!(manager.poll(), Ok(PollOutcome::DutiesChanged(0, duties)));
// Return no duties.
beacon_node.set_next_shuffling_result(Ok(None));
assert_eq!(manager.poll(), Ok(PollOutcome::UnknownValidatorOrEpoch(0)));
}
}

View File

@ -0,0 +1,40 @@
use super::traits::BeaconNode;
use super::{DutiesManager, PollOutcome};
use slog::{debug, error, info, Logger};
use slot_clock::SlotClock;
use std::time::Duration;
pub struct DutiesManagerService<T: SlotClock, U: BeaconNode> {
pub manager: DutiesManager<T, U>,
pub poll_interval_millis: u64,
pub log: Logger,
}
impl<T: SlotClock, U: BeaconNode> DutiesManagerService<T, U> {
/// Run a loop which polls the manager each `poll_interval_millis` milliseconds.
///
/// Logs the results of the polls.
pub fn run(&mut self) {
loop {
match self.manager.poll() {
Err(error) => {
error!(self.log, "Epoch duties poll error"; "error" => format!("{:?}", error))
}
Ok(PollOutcome::NoChange(epoch)) => {
debug!(self.log, "No change in duties"; "epoch" => epoch)
}
Ok(PollOutcome::DutiesChanged(epoch, duties)) => {
info!(self.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(PollOutcome::NewDuties(epoch, duties)) => {
info!(self.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) => {
error!(self.log, "Epoch or validator unknown"; "epoch" => epoch)
}
};
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
}

View File

@ -0,0 +1,31 @@
use super::traits::{BeaconNode, BeaconNodeError};
use super::EpochDuties;
use bls::PublicKey;
use std::sync::RwLock;
type ShufflingResult = Result<Option<EpochDuties>, BeaconNodeError>;
/// A test-only struct used to simulate a Beacon Node.
#[derive(Default)]
pub struct TestBeaconNode {
pub request_shuffling_input: RwLock<Option<(u64, PublicKey)>>,
pub request_shuffling_result: RwLock<Option<ShufflingResult>>,
}
impl TestBeaconNode {
/// Set the result to be returned when `request_shuffling` is called.
pub fn set_next_shuffling_result(&self, result: ShufflingResult) {
*self.request_shuffling_result.write().unwrap() = Some(result);
}
}
impl BeaconNode for TestBeaconNode {
/// Returns the value specified by the `set_next_shuffling_result`.
fn request_shuffling(&self, epoch: u64, public_key: &PublicKey) -> ShufflingResult {
*self.request_shuffling_input.write().unwrap() = Some((epoch, public_key.clone()));
match *self.request_shuffling_result.read().unwrap() {
Some(ref r) => r.clone(),
None => panic!("TestBeaconNode: produce_result == None"),
}
}
}

View File

@ -0,0 +1,19 @@
use super::EpochDuties;
use bls::PublicKey;
#[derive(Debug, PartialEq, Clone)]
pub enum BeaconNodeError {
RemoteFailure(String),
}
/// Defines the methods required to obtain a validators shuffling from a Beacon Node.
pub trait BeaconNode: Send + Sync {
/// Get the shuffling for the given epoch and public key.
///
/// Returns Ok(None) if the public key is unknown, or the shuffling for that epoch is unknown.
fn request_shuffling(
&self,
epoch: u64,
public_key: &PublicKey,
) -> Result<Option<EpochDuties>, BeaconNodeError>;
}

View File

@ -0,0 +1,166 @@
use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap};
use crate::block_producer::{BlockProducer, BlockProducerService};
use crate::config::ClientConfig;
use bls::Keypair;
use clap::{App, Arg};
use grpcio::{ChannelBuilder, EnvBuilder};
use protos::services_grpc::{BeaconBlockServiceClient, ValidatorServiceClient};
use slog::{error, info, o, Drain};
use slot_clock::SystemTimeSlotClock;
use spec::ChainSpec;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::thread;
mod block_producer;
mod config;
mod duties;
fn main() {
// Logging
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let log = slog::Logger::root(drain, o!());
// CLI
let matches = App::new("Lighthouse Validator Client")
.version("0.0.1")
.author("Sigma Prime <contact@sigmaprime.io>")
.about("Eth 2.0 Validator Client")
.arg(
Arg::with_name("datadir")
.long("datadir")
.value_name("DIR")
.help("Data directory for keys and databases.")
.takes_value(true),
)
.arg(
Arg::with_name("server")
.long("server")
.value_name("server")
.help("Address to connect to BeaconNode.")
.takes_value(true),
)
.get_matches();
let mut config = ClientConfig::default();
// Custom datadir
if let Some(dir) = matches.value_of("datadir") {
config.data_dir = PathBuf::from(dir.to_string());
}
// Custom server port
if let Some(server_str) = matches.value_of("server") {
if let Ok(addr) = server_str.parse::<u16>() {
config.server = addr.to_string();
} else {
error!(log, "Invalid address"; "server" => server_str);
return;
}
}
// Log configuration
info!(log, "";
"data_dir" => &config.data_dir.to_str(),
"server" => &config.server);
// Beacon node gRPC beacon block endpoints.
let beacon_block_grpc_client = {
let env = Arc::new(EnvBuilder::new().build());
let ch = ChannelBuilder::new(env).connect(&config.server);
Arc::new(BeaconBlockServiceClient::new(ch))
};
// Beacon node gRPC validator endpoints.
let validator_grpc_client = {
let env = Arc::new(EnvBuilder::new().build());
let ch = ChannelBuilder::new(env).connect(&config.server);
Arc::new(ValidatorServiceClient::new(ch))
};
// Ethereum
//
// TODO: Permit loading a custom spec from file.
// https://github.com/sigp/lighthouse/issues/160
let spec = Arc::new(ChainSpec::foundation());
// Clock for determining the present slot.
let slot_clock = {
info!(log, "Genesis time"; "unix_epoch_seconds" => spec.genesis_time);
let clock = SystemTimeSlotClock::new(spec.genesis_time, spec.slot_duration)
.expect("Unable to instantiate SystemTimeSlotClock.");
Arc::new(RwLock::new(clock))
};
let poll_interval_millis = spec.slot_duration * 1000 / 10; // 10% epoch time precision.
info!(log, "Starting block producer service"; "polls_per_epoch" => spec.slot_duration * 1000 / poll_interval_millis);
/*
* Start threads.
*/
let mut threads = vec![];
// TODO: keypairs are randomly generated; they should be loaded from a file or generated.
// https://github.com/sigp/lighthouse/issues/160
let keypairs = vec![Keypair::random()];
for keypair in keypairs {
info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());
let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new()));
// Spawn a new thread to maintain the validator's `EpochDuties`.
let duties_manager_thread = {
let spec = spec.clone();
let duties_map = duties_map.clone();
let slot_clock = slot_clock.clone();
let log = log.clone();
let beacon_node = validator_grpc_client.clone();
let pubkey = keypair.pk.clone();
thread::spawn(move || {
let manager = DutiesManager {
duties_map,
pubkey,
spec,
slot_clock,
beacon_node,
};
let mut duties_manager_service = DutiesManagerService {
manager,
poll_interval_millis,
log,
};
duties_manager_service.run();
})
};
// Spawn a new thread to perform block production for the validator.
let producer_thread = {
let spec = spec.clone();
let duties_map = duties_map.clone();
let slot_clock = slot_clock.clone();
let log = log.clone();
let client = beacon_block_grpc_client.clone();
thread::spawn(move || {
let block_producer = BlockProducer::new(spec, duties_map, slot_clock, client);
let mut block_producer_service = BlockProducerService {
block_producer,
poll_interval_millis,
log,
};
block_producer_service.run();
})
};
threads.push((duties_manager_thread, producer_thread));
}
// Naively wait for all the threads to complete.
for tuple in threads {
let (manager, producer) = tuple;
let _ = producer.join();
let _ = manager.join();
}
}