Web3Signer support for VC (#2522)

[EIP-3030]: https://eips.ethereum.org/EIPS/eip-3030
[Web3Signer]: https://consensys.github.io/web3signer/web3signer-eth2.html

## Issue Addressed

Resolves #2498

## Proposed Changes

Allows the VC to call out to a [Web3Signer] remote signer to obtain signatures.


## Additional Info

### Making Signing Functions `async`

To allow remote signing, I needed to make all the signing functions `async`. This caused a bit of noise where I had to convert iterators into `for` loops.

In `duties_service.rs` there was a particularly tricky case where we couldn't hold a write-lock across an `await`, so I had to first take a read-lock, then grab a write-lock.

### Move Signing from Core Executor

Whilst implementing this feature, I noticed that we signing was happening on the core tokio executor. I suspect this was causing the executor to temporarily lock and occasionally trigger some HTTP timeouts (and potentially SQL pool timeouts, but I can't verify this). Since moving all signing into blocking tokio tasks, I noticed a distinct drop in the "atttestations_http_get" metric on a Prater node:

![http_get_times](https://user-images.githubusercontent.com/6660660/132143737-82fd3836-2e7e-445b-a143-cb347783baad.png)

I think this graph indicates that freeing the core executor allows the VC to operate more smoothly.

### Refactor TaskExecutor

I noticed that the `TaskExecutor::spawn_blocking_handle` function would fail to spawn tasks if it were unable to obtain handles to some metrics (this can happen if the same metric is defined twice). It seemed that a more sensible approach would be to keep spawning tasks, but without metrics. To that end, I refactored the function so that it would still function without metrics. There are no other changes made.

## TODO

- [x] Restructure to support multiple signing methods.
- [x] Add calls to remote signer from VC.
- [x] Documentation
- [x] Test all endpoints
- [x] Test HTTPS certificate
- [x] Allow adding remote signer validators via the API
- [x] Add Altair support via [21.8.1-rc1](https://github.com/ConsenSys/web3signer/releases/tag/21.8.1-rc1)
- [x] Create issue to start using latest version of web3signer. (See #2570)

## Notes

- ~~Web3Signer doesn't yet support the Altair fork for Prater. See https://github.com/ConsenSys/web3signer/issues/423.~~
- ~~There is not yet a release of Web3Signer which supports Altair blocks. See https://github.com/ConsenSys/web3signer/issues/391.~~
This commit is contained in:
Paul Hauner 2021-09-16 03:26:33 +00:00
parent 58012f85e1
commit c5c7476518
37 changed files with 2236 additions and 478 deletions

27
Cargo.lock generated
View File

@ -6972,6 +6972,7 @@ dependencies = [
"parking_lot", "parking_lot",
"rand 0.7.3", "rand 0.7.3",
"rayon", "rayon",
"reqwest",
"ring", "ring",
"safe_arith", "safe_arith",
"scrypt", "scrypt",
@ -6990,6 +6991,7 @@ dependencies = [
"tokio", "tokio",
"tree_hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "tree_hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"types", "types",
"url",
"validator_dir", "validator_dir",
"warp", "warp",
"warp_utils", "warp_utils",
@ -7295,6 +7297,31 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "web3signer_tests"
version = "0.1.0"
dependencies = [
"account_utils",
"environment",
"eth2_keystore",
"eth2_network_config",
"exit-future",
"futures",
"reqwest",
"serde",
"serde_derive",
"serde_json",
"serde_yaml",
"slot_clock",
"task_executor",
"tempfile",
"tokio",
"types",
"url",
"validator_client",
"zip",
]
[[package]] [[package]]
name = "webpki" name = "webpki"
version = "0.21.4" version = "0.21.4"

View File

@ -76,6 +76,7 @@ members = [
"testing/node_test_rig", "testing/node_test_rig",
"testing/simulator", "testing/simulator",
"testing/state_transition_vectors", "testing/state_transition_vectors",
"testing/web3signer_tests",
"validator_client", "validator_client",
"validator_client/slashing_protection", "validator_client/slashing_protection",

View File

@ -32,6 +32,7 @@
* [Advanced Usage](./advanced.md) * [Advanced Usage](./advanced.md)
* [Custom Data Directories](./advanced-datadir.md) * [Custom Data Directories](./advanced-datadir.md)
* [Validator Graffiti](./graffiti.md) * [Validator Graffiti](./graffiti.md)
* [Remote Signing with Web3Signer](./validator-web3signer.md)
* [Database Configuration](./advanced_database.md) * [Database Configuration](./advanced_database.md)
* [Advanced Networking](./advanced_networking.md) * [Advanced Networking](./advanced_networking.md)
* [Running a Slasher](./slasher.md) * [Running a Slasher](./slasher.md)

View File

@ -434,3 +434,43 @@ Typical Responses | 200
] ]
} }
``` ```
## `POST /lighthouse/validators/web3signer`
Create any number of new validators, all of which will refer to a
[Web3Signer](https://docs.web3signer.consensys.net/en/latest/) server for signing.
### HTTP Specification
| Property | Specification |
| --- |--- |
Path | `/lighthouse/validators/web3signer`
Method | POST
Required Headers | [`Authorization`](./api-vc-auth-header.md)
Typical Responses | 200, 400
### Example Request Body
```json
[
{
"enable": true,
"description": "validator_one",
"graffiti": "Mr F was here",
"voting_public_key": "0xa062f95fee747144d5e511940624bc6546509eeaeae9383257a9c43e7ddc58c17c2bab4ae62053122184c381b90db380",
"url": "http://path-to-web3signer.com",
"root_certificate_path": "/path/on/vc/filesystem/to/certificate.pem",
"request_timeout_ms": 12000
}
]
```
The following fields may be omitted or nullified to obtain default values:
- `graffiti`
- `root_certificate_path`
- `request_timeout_ms`
### Example Response Body
*No data is included in the response body.*

View File

@ -0,0 +1,56 @@
# Remote Signing with Web3Signer
[Web3Signer]: https://docs.web3signer.consensys.net/en/latest/
[Consensys]: https://github.com/ConsenSys/
[Teku]: https://github.com/consensys/teku
[Web3Signer] is a tool by Consensys which allows *remote signing*. Remote signing is when a
Validator Client (VC) out-sources the signing of messages to remote server (e.g., via HTTPS). This
means that the VC does not hold the validator private keys.
## Warnings
Using a remote signer comes with risks, please read the following two warnings before proceeding:
### Remote signing is complex and risky
Remote signing is generally only desirable for enterprise users or users with unique security
requirements. Most users will find the separation between the Beacon Node (BN) and VC to be
sufficient *without* introducing a remote signer.
**Using a remote signer introduces a new set of security and slashing risks and should only be
undertaken by advanced users who fully understand the risks.**
### Web3Signer is not maintained by Lighthouse
The [Web3Signer] tool is maintained by [Consensys], the same team that maintains [Teku]. The
Lighthouse team (Sigma Prime) does not maintain Web3Signer or make any guarantees about its safety
or effectiveness.
## Usage
A remote signing validator is added to Lighthouse in much the same way as one that uses a local
keystore, via the [`validator_definitions.yml`](./validator-management.md) file or via the `POST
/lighthouse/validators/web3signer` API endpoint.
Here is an example of a `validator_definitions.yml` file containing one validator which uses a
remote signer:
```yaml
---
- enabled: true
voting_public_key: "0xa5566f9ec3c6e1fdf362634ebec9ef7aceb0e460e5079714808388e5d48f4ae1e12897fed1bea951c17fa389d511e477"
type: web3signer
url: "https://my-remote-signer.com:1234"
root_certificate_path: /home/paul/my-certificates/my-remote-signer.pem
```
When using this file, the Lighthouse VC will perform duties for the `0xa5566..` validator and defer
to the `https://my-remote-signer.com:1234` server to obtain any signatures. It will load a
"self-signed" SSL certificate from `/home/paul/my-certificates/my-remote-signer.pem` (on the
filesystem of the VC) to encrypt the communications between the VC and Web3Signer.
> The `request_timeout_ms` key can also be specified. Use this key to override the default timeout
> with a new timeout in milliseconds. This is the timeout before requests to Web3Signer are
> considered to be failures. Setting a value that it too-long may create contention and late duties
> in the VC. Setting it too short will result in failed signatures and therefore missed duties.

View File

@ -61,6 +61,21 @@ pub enum SigningDefinition {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
voting_keystore_password: Option<ZeroizeString>, voting_keystore_password: Option<ZeroizeString>,
}, },
/// A validator that defers to a Web3Signer HTTP server for signing.
///
/// https://github.com/ConsenSys/web3signer
#[serde(rename = "web3signer")]
Web3Signer {
url: String,
/// Path to a .pem file.
#[serde(skip_serializing_if = "Option::is_none")]
root_certificate_path: Option<PathBuf>,
/// Specifies a request timeout.
///
/// The timeout is applied from when the request starts connecting until the response body has finished.
#[serde(skip_serializing_if = "Option::is_none")]
request_timeout_ms: Option<u64>,
},
} }
/// A validator that may be initialized by this validator client. /// A validator that may be initialized by this validator client.
@ -116,6 +131,12 @@ impl ValidatorDefinition {
#[derive(Default, Serialize, Deserialize)] #[derive(Default, Serialize, Deserialize)]
pub struct ValidatorDefinitions(Vec<ValidatorDefinition>); pub struct ValidatorDefinitions(Vec<ValidatorDefinition>);
impl From<Vec<ValidatorDefinition>> for ValidatorDefinitions {
fn from(vec: Vec<ValidatorDefinition>) -> Self {
Self(vec)
}
}
impl ValidatorDefinitions { impl ValidatorDefinitions {
/// Open an existing file or create a new, empty one if it does not exist. /// Open an existing file or create a new, empty one if it does not exist.
pub fn open_or_create<P: AsRef<Path>>(validators_dir: P) -> Result<Self, Error> { pub fn open_or_create<P: AsRef<Path>>(validators_dir: P) -> Result<Self, Error> {
@ -167,11 +188,13 @@ impl ValidatorDefinitions {
let known_paths: HashSet<&PathBuf> = self let known_paths: HashSet<&PathBuf> = self
.0 .0
.iter() .iter()
.map(|def| match &def.signing_definition { .filter_map(|def| match &def.signing_definition {
SigningDefinition::LocalKeystore { SigningDefinition::LocalKeystore {
voting_keystore_path, voting_keystore_path,
.. ..
} => voting_keystore_path, } => Some(voting_keystore_path),
// A Web3Signer validator does not use a local keystore file.
SigningDefinition::Web3Signer { .. } => None,
}) })
.collect(); .collect();

View File

@ -313,6 +313,22 @@ impl ValidatorClientHttpClient {
self.post(path, &request).await self.post(path, &request).await
} }
/// `POST lighthouse/validators/web3signer`
pub async fn post_lighthouse_validators_web3signer(
&self,
request: &[Web3SignerValidatorRequest],
) -> Result<GenericResponse<ValidatorData>, Error> {
let mut path = self.server.full.clone();
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("lighthouse")
.push("validators")
.push("web3signer");
self.post(path, &request).await
}
/// `PATCH lighthouse/validators/{validator_pubkey}` /// `PATCH lighthouse/validators/{validator_pubkey}`
pub async fn patch_lighthouse_validators( pub async fn patch_lighthouse_validators(
&self, &self,

View File

@ -2,6 +2,7 @@ use account_utils::ZeroizeString;
use eth2_keystore::Keystore; use eth2_keystore::Keystore;
use graffiti::GraffitiString; use graffiti::GraffitiString;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf;
pub use crate::lighthouse::Health; pub use crate::lighthouse::Health;
pub use crate::types::{GenericResponse, VersionData}; pub use crate::types::{GenericResponse, VersionData};
@ -64,3 +65,20 @@ pub struct KeystoreValidatorsPostRequest {
pub keystore: Keystore, pub keystore: Keystore,
pub graffiti: Option<GraffitiString>, pub graffiti: Option<GraffitiString>,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Web3SignerValidatorRequest {
pub enable: bool,
pub description: String,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub graffiti: Option<GraffitiString>,
pub voting_public_key: PublicKey,
pub url: String,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub root_certificate_path: Option<PathBuf>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub request_timeout_ms: Option<u64>,
}

View File

@ -283,6 +283,18 @@ pub fn set_gauge_vec(int_gauge_vec: &Result<IntGaugeVec>, name: &[&str], value:
} }
} }
pub fn inc_gauge_vec(int_gauge_vec: &Result<IntGaugeVec>, name: &[&str]) {
if let Some(gauge) = get_int_gauge(int_gauge_vec, name) {
gauge.inc();
}
}
pub fn dec_gauge_vec(int_gauge_vec: &Result<IntGaugeVec>, name: &[&str]) {
if let Some(gauge) = get_int_gauge(int_gauge_vec, name) {
gauge.dec();
}
}
pub fn set_gauge(gauge: &Result<IntGauge>, value: i64) { pub fn set_gauge(gauge: &Result<IntGauge>, value: i64) {
if let Ok(gauge) = gauge { if let Ok(gauge) = gauge {
gauge.set(value); gauge.set(value);

View File

@ -237,39 +237,33 @@ impl TaskExecutor {
{ {
let log = self.log.clone(); let log = self.log.clone();
if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) { let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]);
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::BLOCKING_TASKS_COUNT, &[name]) metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]);
{
let int_gauge_1 = int_gauge;
let timer = metric.start_timer();
let join_handle = if let Some(runtime) = self.runtime.upgrade() {
runtime.spawn_blocking(task)
} else {
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
return None;
};
Some(async move { let join_handle = if let Some(runtime) = self.runtime.upgrade() {
let result = match join_handle.await { runtime.spawn_blocking(task)
Ok(result) => {
trace!(log, "Blocking task completed"; "task" => name);
Ok(result)
}
Err(e) => {
debug!(log, "Blocking task ended unexpectedly"; "error" => %e);
Err(e)
}
};
timer.observe_duration();
int_gauge_1.dec();
result
})
} else {
None
}
} else { } else {
None debug!(self.log, "Couldn't spawn task. Runtime shutting down");
} return None;
};
let future = async move {
let result = match join_handle.await {
Ok(result) => {
trace!(log, "Blocking task completed"; "task" => name);
Ok(result)
}
Err(e) => {
debug!(log, "Blocking task ended unexpectedly"; "error" => %e);
Err(e)
}
};
drop(timer);
metrics::dec_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]);
result
};
Some(future)
} }
pub fn runtime(&self) -> Weak<Runtime> { pub fn runtime(&self) -> Weak<Runtime> {

View File

@ -9,7 +9,7 @@ use crate::{test_utils::TestRandom, Hash256, Slot};
use super::{ use super::{
AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey,
SignedRoot, Signature, SignedRoot,
}; };
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -60,6 +60,25 @@ impl<T: EthSpec> Attestation<T> {
fork: &Fork, fork: &Fork,
genesis_validators_root: Hash256, genesis_validators_root: Hash256,
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<(), Error> {
let domain = spec.get_domain(
self.data.target.epoch,
Domain::BeaconAttester,
fork,
genesis_validators_root,
);
let message = self.data.signing_root(domain);
self.add_signature(&secret_key.sign(message), committee_position)
}
/// Adds `signature` to `self` and sets the `committee_position`'th bit of `aggregation_bits` to `true`.
///
/// Returns an `AlreadySigned` error if the `committee_position`'th bit is already `true`.
pub fn add_signature(
&mut self,
signature: &Signature,
committee_position: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
if self if self
.aggregation_bits .aggregation_bits
@ -72,15 +91,7 @@ impl<T: EthSpec> Attestation<T> {
.set(committee_position, true) .set(committee_position, true)
.map_err(Error::SszTypesError)?; .map_err(Error::SszTypesError)?;
let domain = spec.get_domain( self.signature.add_assign(signature);
self.data.target.epoch,
Domain::BeaconAttester,
fork,
genesis_validators_root,
);
let message = self.data.signing_root(domain);
self.signature.add_assign(&secret_key.sign(message));
Ok(()) Ok(())
} }

View File

@ -12,6 +12,7 @@ use tree_hash_derive::TreeHash;
)] )]
pub struct SyncAggregatorSelectionData { pub struct SyncAggregatorSelectionData {
pub slot: Slot, pub slot: Slot,
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub subcommittee_index: u64, pub subcommittee_index: u64,
} }

View File

@ -0,0 +1,35 @@
[package]
name = "web3signer_tests"
version = "0.1.0"
edition = "2018"
build = "build.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
[dev-dependencies]
eth2_keystore = { path = "../../crypto/eth2_keystore" }
types = { path = "../../consensus/types" }
tempfile = "3.1.0"
tokio = { version = "1.10.0", features = ["rt-multi-thread", "macros"] }
reqwest = { version = "0.11.0", features = ["json","stream"] }
url = "2.2.2"
validator_client = { path = "../../validator_client" }
slot_clock = { path = "../../common/slot_clock" }
futures = "0.3.7"
exit-future = "0.2.0"
task_executor = { path = "../../common/task_executor" }
environment = { path = "../../lighthouse/environment" }
account_utils = { path = "../../common/account_utils" }
serde = "1.0.116"
serde_derive = "1.0.116"
serde_yaml = "0.8.13"
eth2_network_config = { path = "../../common/eth2_network_config" }
[build-dependencies]
tokio = { version = "1.10.0", features = ["rt-multi-thread", "macros"] }
reqwest = { version = "0.11.0", features = ["json","stream"] }
serde_json = "1.0.58"
zip = "0.5.13"

View File

@ -0,0 +1,100 @@
//! This build script downloads the latest Web3Signer release and places it in the `OUT_DIR` so it
//! can be used for integration testing.
use reqwest::Client;
use serde_json::Value;
use std::env;
use std::fs;
use std::path::PathBuf;
use zip::ZipArchive;
/// Use `None` to download the latest Github release.
/// Use `Some("21.8.1")` to download a specific version.
const FIXED_VERSION_STRING: Option<&str> = None;
#[tokio::main]
async fn main() {
let out_dir = env::var("OUT_DIR").unwrap();
download_binary(out_dir.into()).await;
}
pub async fn download_binary(dest_dir: PathBuf) {
let version_file = dest_dir.join("version");
let client = Client::builder()
// Github gives a 403 without a user agent.
.user_agent("web3signer_tests")
.build()
.unwrap();
let version = if let Some(version) = FIXED_VERSION_STRING {
version.to_string()
} else {
// Get the latest release of the web3 signer repo.
let latest_response: Value = client
.get("https://api.github.com/repos/ConsenSys/web3signer/releases/latest")
.send()
.await
.unwrap()
.error_for_status()
.unwrap()
.json()
.await
.unwrap();
latest_response
.get("tag_name")
.unwrap()
.as_str()
.unwrap()
.to_string()
};
if version_file.exists() && fs::read(&version_file).unwrap() == version.as_bytes() {
// The latest version is already downloaded, do nothing.
return;
} else {
// Ignore the result since we don't care if the version file already exists.
let _ = fs::remove_file(&version_file);
}
// Download the latest release zip.
let zip_url = format!("https://artifacts.consensys.net/public/web3signer/raw/names/web3signer.zip/versions/{}/web3signer-{}.zip", version, version);
let zip_response = client
.get(zip_url)
.send()
.await
.unwrap()
.error_for_status()
.unwrap()
.bytes()
.await
.unwrap();
// Write the zip to a file.
let zip_path = dest_dir.join(format!("{}.zip", version));
fs::write(&zip_path, zip_response).unwrap();
// Unzip the zip.
let mut zip_file = fs::File::open(&zip_path).unwrap();
ZipArchive::new(&mut zip_file)
.unwrap()
.extract(&dest_dir)
.unwrap();
// Rename the web3signer directory so it doesn't include the version string. This ensures the
// path to the binary is predictable.
let web3signer_dir = dest_dir.join("web3signer");
if web3signer_dir.exists() {
fs::remove_dir_all(&web3signer_dir).unwrap();
}
fs::rename(
dest_dir.join(format!("web3signer-{}", version)),
web3signer_dir,
)
.unwrap();
// Delete zip and unzipped dir.
fs::remove_file(&zip_path).unwrap();
// Update the version file to avoid duplicate downloads.
fs::write(&version_file, version).unwrap();
}

View File

@ -0,0 +1,589 @@
//! This crate provides a series of integration tests between the Lighthouse `ValidatorStore` and
//! Web3Signer by Consensys.
//!
//! These tests aim to ensure that:
//!
//! - Lighthouse can issue valid requests to Web3Signer.
//! - The signatures generated by Web3Signer are identical to those which Lighthouse generates.
//!
//! There is a build script in this crate which obtains the latest version of Web3Signer and makes
//! it available via the `OUT_DIR`.
#[cfg(all(test, unix, not(debug_assertions)))]
mod tests {
use account_utils::validator_definitions::{
SigningDefinition, ValidatorDefinition, ValidatorDefinitions,
};
use eth2_keystore::KeystoreBuilder;
use eth2_network_config::Eth2NetworkConfig;
use reqwest::Client;
use serde::Serialize;
use slot_clock::{SlotClock, TestingSlotClock};
use std::env;
use std::fmt::Debug;
use std::fs::{self, File};
use std::future::Future;
use std::path::PathBuf;
use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use std::time::{Duration, Instant};
use task_executor::TaskExecutor;
use tempfile::TempDir;
use tokio::time::sleep;
use types::*;
use url::Url;
use validator_client::{
initialized_validators::{load_pem_certificate, InitializedValidators},
validator_store::ValidatorStore,
SlashingDatabase, SLASHING_PROTECTION_FILENAME,
};
/// If the we are unable to reach the Web3Signer HTTP API within this time out then we will
/// assume it failed to start.
const UPCHECK_TIMEOUT: Duration = Duration::from_secs(20);
/// Set to `true` to send the Web3Signer logs to the console during tests. Logs are useful when
/// debugging.
const SUPPRESS_WEB3SIGNER_LOGS: bool = true;
type E = MainnetEthSpec;
/// This marker trait is implemented for objects that we wish to compare to ensure Web3Signer
/// and Lighthouse agree on signatures.
///
/// The purpose of this trait is to prevent accidentally comparing useless values like `()`.
trait SignedObject: PartialEq + Debug {}
impl SignedObject for Signature {}
impl SignedObject for Attestation<E> {}
impl SignedObject for SignedBeaconBlock<E> {}
impl SignedObject for SignedAggregateAndProof<E> {}
impl SignedObject for SelectionProof {}
impl SignedObject for SyncSelectionProof {}
impl SignedObject for SyncCommitteeMessage {}
impl SignedObject for SignedContributionAndProof<E> {}
/// A file format used by Web3Signer to discover and unlock keystores.
#[derive(Serialize)]
struct Web3SignerKeyConfig {
#[serde(rename = "type")]
config_type: String,
#[serde(rename = "keyType")]
key_type: String,
#[serde(rename = "keystoreFile")]
keystore_file: String,
#[serde(rename = "keystorePasswordFile")]
keystore_password_file: String,
}
const KEYSTORE_PASSWORD: &str = "hi mum";
const WEB3SIGNER_LISTEN_ADDRESS: &str = "127.0.0.1";
/// A deterministic, arbitrary keypair.
fn testing_keypair() -> Keypair {
// Just an arbitrary secret key.
let sk = SecretKey::deserialize(&[
85, 40, 245, 17, 84, 193, 234, 155, 24, 234, 181, 58, 171, 193, 209, 164, 120, 147, 10,
174, 189, 228, 119, 48, 181, 19, 117, 223, 2, 240, 7, 108,
])
.unwrap();
let pk = sk.public_key();
Keypair::from_components(pk, sk)
}
/// The location of the Web3Signer binary generated by the build script.
fn web3signer_binary() -> PathBuf {
PathBuf::from(env::var("OUT_DIR").unwrap())
.join("web3signer")
.join("bin")
.join("web3signer")
}
/// The location of a directory where we keep some files for testing TLS.
fn tls_dir() -> PathBuf {
PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap()).join("tls")
}
fn root_certificate_path() -> PathBuf {
tls_dir().join("cert.pem")
}
/// A testing rig which holds a live Web3Signer process.
struct Web3SignerRig {
keypair: Keypair,
_keystore_dir: TempDir,
keystore_path: PathBuf,
web3signer_child: Child,
http_client: Client,
url: Url,
}
impl Drop for Web3SignerRig {
fn drop(&mut self) {
self.web3signer_child.kill().unwrap();
}
}
impl Web3SignerRig {
pub async fn new(network: &str, listen_address: &str, listen_port: u16) -> Self {
let keystore_dir = TempDir::new().unwrap();
let keypair = testing_keypair();
let keystore =
KeystoreBuilder::new(&keypair, KEYSTORE_PASSWORD.as_bytes(), "".to_string())
.unwrap()
.build()
.unwrap();
let keystore_filename = "keystore.json";
let keystore_path = keystore_dir.path().join(keystore_filename);
let keystore_file = File::create(&keystore_path).unwrap();
keystore.to_json_writer(&keystore_file).unwrap();
let keystore_password_filename = "password.txt";
let keystore_password_path = keystore_dir.path().join(keystore_password_filename);
fs::write(&keystore_password_path, KEYSTORE_PASSWORD.as_bytes()).unwrap();
let key_config = Web3SignerKeyConfig {
config_type: "file-keystore".to_string(),
key_type: "BLS".to_string(),
keystore_file: keystore_filename.to_string(),
keystore_password_file: keystore_password_filename.to_string(),
};
let key_config_file =
File::create(&keystore_dir.path().join("key-config.yaml")).unwrap();
serde_yaml::to_writer(key_config_file, &key_config).unwrap();
let tls_keystore_file = tls_dir().join("key.p12");
let tls_keystore_password_file = tls_dir().join("password.txt");
let stdio = || {
if SUPPRESS_WEB3SIGNER_LOGS {
Stdio::null()
} else {
Stdio::inherit()
}
};
let web3signer_child = Command::new(web3signer_binary())
.arg(format!(
"--key-store-path={}",
keystore_dir.path().to_str().unwrap()
))
.arg(format!("--http-listen-host={}", listen_address))
.arg(format!("--http-listen-port={}", listen_port))
.arg("--tls-allow-any-client=true")
.arg(format!(
"--tls-keystore-file={}",
tls_keystore_file.to_str().unwrap()
))
.arg(format!(
"--tls-keystore-password-file={}",
tls_keystore_password_file.to_str().unwrap()
))
.arg("eth2")
.arg(format!("--network={}", network))
.arg("--slashing-protection-enabled=false")
.stdout(stdio())
.stderr(stdio())
.spawn()
.unwrap();
let url = Url::parse(&format!("https://{}:{}", listen_address, listen_port)).unwrap();
let certificate = load_pem_certificate(root_certificate_path()).unwrap();
let http_client = Client::builder()
.add_root_certificate(certificate)
.build()
.unwrap();
let s = Self {
keypair,
_keystore_dir: keystore_dir,
keystore_path,
web3signer_child,
http_client,
url,
};
s.wait_until_up(UPCHECK_TIMEOUT).await;
s
}
pub async fn wait_until_up(&self, timeout: Duration) {
let start = Instant::now();
loop {
if self.upcheck().await.is_ok() {
return;
} else if Instant::now().duration_since(start) > timeout {
panic!("upcheck failed with timeout {:?}", timeout)
} else {
sleep(Duration::from_secs(1)).await;
}
}
}
pub async fn upcheck(&self) -> Result<(), ()> {
let url = self.url.join("upcheck").unwrap();
self.http_client
.get(url)
.send()
.await
.map_err(|_| ())?
.error_for_status()
.map(|_| ())
.map_err(|_| ())
}
}
/// A testing rig which holds a `ValidatorStore`.
struct ValidatorStoreRig {
validator_store: Arc<ValidatorStore<TestingSlotClock, E>>,
_validator_dir: TempDir,
runtime: Arc<tokio::runtime::Runtime>,
_runtime_shutdown: exit_future::Signal,
}
impl ValidatorStoreRig {
pub async fn new(validator_definitions: Vec<ValidatorDefinition>, spec: ChainSpec) -> Self {
let log = environment::null_logger().unwrap();
let validator_dir = TempDir::new().unwrap();
let validator_definitions = ValidatorDefinitions::from(validator_definitions);
let initialized_validators = InitializedValidators::from_definitions(
validator_definitions,
validator_dir.path().into(),
log.clone(),
)
.await
.unwrap();
let voting_pubkeys: Vec<_> = initialized_validators.iter_voting_pubkeys().collect();
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
let (runtime_shutdown, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor =
TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx);
let slashing_db_path = validator_dir.path().join(SLASHING_PROTECTION_FILENAME);
let slashing_protection = SlashingDatabase::open_or_create(&slashing_db_path).unwrap();
slashing_protection
.register_validators(voting_pubkeys.iter().copied())
.unwrap();
let slot_clock =
TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1));
let validator_store = ValidatorStore::<_, E>::new(
initialized_validators,
slashing_protection,
Hash256::repeat_byte(42),
spec,
None,
slot_clock,
executor,
log.clone(),
);
Self {
validator_store: Arc::new(validator_store),
_validator_dir: validator_dir,
runtime,
_runtime_shutdown: runtime_shutdown,
}
}
pub fn shutdown(self) {
Arc::try_unwrap(self.runtime).unwrap().shutdown_background()
}
}
/// A testing rig which holds multiple `ValidatorStore` rigs and one `Web3Signer` rig.
///
/// The intent of this rig is to allow testing a `ValidatorStore` using `Web3Signer` against
/// another `ValidatorStore` using a local keystore and ensure that both `ValidatorStore`s
/// behave identically.
struct TestingRig {
_signer_rig: Web3SignerRig,
validator_rigs: Vec<ValidatorStoreRig>,
validator_pubkey: PublicKeyBytes,
}
impl Drop for TestingRig {
fn drop(&mut self) {
for rig in std::mem::take(&mut self.validator_rigs) {
rig.shutdown();
}
}
}
impl TestingRig {
pub async fn new(network: &str, spec: ChainSpec, listen_port: u16) -> Self {
let signer_rig =
Web3SignerRig::new(network, WEB3SIGNER_LISTEN_ADDRESS, listen_port).await;
let validator_pubkey = signer_rig.keypair.pk.clone();
let local_signer_validator_store = {
let validator_definition = ValidatorDefinition {
enabled: true,
voting_public_key: validator_pubkey.clone(),
graffiti: None,
description: String::default(),
signing_definition: SigningDefinition::LocalKeystore {
voting_keystore_path: signer_rig.keystore_path.clone(),
voting_keystore_password_path: None,
voting_keystore_password: Some(KEYSTORE_PASSWORD.to_string().into()),
},
};
ValidatorStoreRig::new(vec![validator_definition], spec.clone()).await
};
let remote_signer_validator_store = {
let validator_definition = ValidatorDefinition {
enabled: true,
voting_public_key: validator_pubkey.clone(),
graffiti: None,
description: String::default(),
signing_definition: SigningDefinition::Web3Signer {
url: signer_rig.url.to_string(),
root_certificate_path: Some(root_certificate_path()),
request_timeout_ms: None,
},
};
ValidatorStoreRig::new(vec![validator_definition], spec).await
};
Self {
_signer_rig: signer_rig,
validator_rigs: vec![local_signer_validator_store, remote_signer_validator_store],
validator_pubkey: PublicKeyBytes::from(&validator_pubkey),
}
}
/// Run the `generate_sig` function across all validator stores on `self` and assert that
/// they all return the same value.
pub async fn assert_signatures_match<F, R, S>(
self,
case_name: &str,
generate_sig: F,
) -> Self
where
F: Fn(PublicKeyBytes, Arc<ValidatorStore<TestingSlotClock, E>>) -> R,
R: Future<Output = S>,
// We use the `SignedObject` trait to white-list objects for comparison. This avoids
// accidentally comparing something meaningless like a `()`.
S: SignedObject,
{
let mut prev_signature = None;
for (i, validator_rig) in self.validator_rigs.iter().enumerate() {
let signature =
generate_sig(self.validator_pubkey, validator_rig.validator_store.clone())
.await;
if let Some(prev_signature) = &prev_signature {
assert_eq!(
prev_signature, &signature,
"signature mismatch at index {} for case {}",
i, case_name
);
}
prev_signature = Some(signature)
}
assert!(prev_signature.is_some(), "sanity check");
self
}
}
/// Get a generic, arbitrary attestation for signing.
fn get_attestation() -> Attestation<E> {
Attestation {
aggregation_bits: BitList::with_capacity(1).unwrap(),
data: AttestationData {
slot: <_>::default(),
index: <_>::default(),
beacon_block_root: <_>::default(),
source: Checkpoint {
epoch: <_>::default(),
root: <_>::default(),
},
target: Checkpoint {
epoch: <_>::default(),
root: <_>::default(),
},
},
signature: AggregateSignature::empty(),
}
}
/// Test all the "base" (phase 0) types.
async fn test_base_types(network: &str, listen_port: u16) {
let network_config = Eth2NetworkConfig::constant(network).unwrap().unwrap();
let spec = &network_config.chain_spec::<E>().unwrap();
TestingRig::new(network, spec.clone(), listen_port)
.await
.assert_signatures_match("randao_reveal", |pubkey, validator_store| async move {
validator_store
.randao_reveal(pubkey, Epoch::new(0))
.await
.unwrap()
})
.await
.assert_signatures_match("beacon_block_base", |pubkey, validator_store| async move {
let block = BeaconBlock::Base(BeaconBlockBase::empty(spec));
let block_slot = block.slot();
validator_store
.sign_block(pubkey, block, block_slot)
.await
.unwrap()
})
.await
.assert_signatures_match("attestation", |pubkey, validator_store| async move {
let mut attestation = get_attestation();
validator_store
.sign_attestation(pubkey, 0, &mut attestation, Epoch::new(0))
.await
.unwrap();
attestation
})
.await
.assert_signatures_match("signed_aggregate", |pubkey, validator_store| async move {
let attestation = get_attestation();
validator_store
.produce_signed_aggregate_and_proof(
pubkey,
0,
attestation,
SelectionProof::from(Signature::empty()),
)
.await
.unwrap()
})
.await
.assert_signatures_match("selection_proof", |pubkey, validator_store| async move {
validator_store
.produce_selection_proof(pubkey, Slot::new(0))
.await
.unwrap()
})
.await;
}
/// Test all the Altair types.
async fn test_altair_types(network: &str, listen_port: u16) {
let network_config = Eth2NetworkConfig::constant(network).unwrap().unwrap();
let spec = &network_config.chain_spec::<E>().unwrap();
let altair_fork_slot = spec
.altair_fork_epoch
.unwrap()
.start_slot(E::slots_per_epoch());
TestingRig::new(network, spec.clone(), listen_port)
.await
.assert_signatures_match(
"beacon_block_altair",
|pubkey, validator_store| async move {
let mut altair_block = BeaconBlockAltair::empty(spec);
altair_block.slot = altair_fork_slot;
validator_store
.sign_block(pubkey, BeaconBlock::Altair(altair_block), altair_fork_slot)
.await
.unwrap()
},
)
.await
.assert_signatures_match(
"sync_selection_proof",
|pubkey, validator_store| async move {
validator_store
.produce_sync_selection_proof(
&pubkey,
altair_fork_slot,
SyncSubnetId::from(0),
)
.await
.unwrap()
},
)
.await
.assert_signatures_match(
"sync_committee_signature",
|pubkey, validator_store| async move {
validator_store
.produce_sync_committee_signature(
altair_fork_slot,
Hash256::zero(),
0,
&pubkey,
)
.await
.unwrap()
},
)
.await
.assert_signatures_match(
"signed_contribution_and_proof",
|pubkey, validator_store| async move {
let contribution = SyncCommitteeContribution {
slot: altair_fork_slot,
beacon_block_root: <_>::default(),
subcommittee_index: <_>::default(),
aggregation_bits: <_>::default(),
signature: AggregateSignature::empty(),
};
validator_store
.produce_signed_contribution_and_proof(
0,
pubkey,
contribution,
SyncSelectionProof::from(Signature::empty()),
)
.await
.unwrap()
},
)
.await;
}
#[tokio::test]
async fn mainnet_base_types() {
test_base_types("mainnet", 4242).await
}
/* The Altair fork for mainnet has not been announced, so this test will always fail.
*
* If this test starts failing, it's likely that the fork has been decided and we should remove
* the `#[should_panic]`
*/
#[tokio::test]
#[should_panic]
async fn mainnet_altair_types() {
test_altair_types("mainnet", 4243).await
}
#[tokio::test]
async fn pyrmont_base_types() {
test_base_types("pyrmont", 4244).await
}
#[tokio::test]
async fn pyrmont_altair_types() {
test_altair_types("pyrmont", 4245).await
}
#[tokio::test]
async fn prater_base_types() {
test_base_types("prater", 4246).await
}
#[tokio::test]
async fn prater_altair_types() {
test_altair_types("prater", 4247).await
}
}

View File

@ -0,0 +1,6 @@
## TLS Testing Files
The files in this directory are used for testing TLS with web3signer. We store them in this
repository since the are not sensitive and it's simpler than regenerating them for each test.
The files were generated using the `./generate.sh` script.

View File

@ -0,0 +1,32 @@
-----BEGIN CERTIFICATE-----
MIIFmTCCA4GgAwIBAgIUd6yn4o1bKr2YpzTxcBmoiM4PorkwDQYJKoZIhvcNAQEL
BQAwajELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlZBMREwDwYDVQQHDAhTb21lQ2l0
eTESMBAGA1UECgwJTXlDb21wYW55MRMwEQYDVQQLDApNeURpdmlzaW9uMRIwEAYD
VQQDDAkxMjcuMC4wLjEwIBcNMjEwOTA2MDgxMDU2WhgPMjEyMTA4MTMwODEwNTZa
MGoxCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJWQTERMA8GA1UEBwwIU29tZUNpdHkx
EjAQBgNVBAoMCU15Q29tcGFueTETMBEGA1UECwwKTXlEaXZpc2lvbjESMBAGA1UE
AwwJMTI3LjAuMC4xMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAx/a1
SRqehj/D18166GcJh/zOyDtZCbeoLWcVfS1aBq+J1FFy4LYKWgwNhOYsrxHLhsIr
/LpHpRm/FFqLPxGNoEPMcJi1dLcELPcJAG1l+B0Ur52V/nxOmzn71Mi0WQv0oOFx
hOtUOToY3heVW0JXgrILhdD834mWdsxBWPhq1LeLZcMth4woMgD9AH4KzxUNtFvo
8i8IneEYvoDIQ8dGZ5lHnFV5kaC8Is0hevMljTw83E9BD0B/bpp+o2rByccVulsy
/WK763tFteDxK5eZZ3/5rRId+uoN5+D4oRnG6zuki0t7+eTZo1cUPi28IIDTNjPR
Xvw35dt+SdTDjtI/FUf8VWhLIHZZXaevFliuBbcuOMpWCdjAdwb7Uf9WpMnxzZtK
fatAC9dk3VPsehFcf6w/H+ah3tu/szAaDJ5zZb0m05cAxDZekZ9SccBIPglccM3f
vzNjrDIoi4z7uCiTJc2FW0qb2MzusQsGjtLW53n7IGoSIFDvOhiZa9D+vOE2wG6o
VNf2K9/QvwNDCzRvW81mcUCRr/BhcAmX5drwYPwUEcdBXQeFPt6nZ33fmIgl2Cbv
io9kUJzjlQWOZ6BX5FmC69dWAedcfHGY693tG6LQKk9a5B+NiuIB4m1bHcvjYhsh
GqVrw980YIN52RmIoskGRdt34/gKHWcqjIEK0+kCAwEAAaM1MDMwCwYDVR0PBAQD
AgQwMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEQQIMAaHBH8AAAEwDQYJKoZI
hvcNAQELBQADggIBAILVu5ppYnumyxvchgSLAi/ahBZV/wmtI3X8vxOHuQwYF8rZ
7b2gd+PClJBuhxeOEJZTtCSDMMUdlBXsxnoftp0TcDhFXeAlSp0JQe38qGAlX94l
4ZH39g+Ut5kVpImb/nI/iQhdOSDzQHaivTMjhNlBW+0EqvVJ1YsjjovtcxXh8gbv
4lKpGkuT6xVRrSGsZh0LQiVtngKNqte8vBvFWBQfj9JFyoYmpSvYl/LaYjYkmCya
V2FbfrhDXDI0IereknqMKDs8rF4Ik6i22b+uG91yyJsRFh63x7agEngpoxYKYV6V
5YXIzH5kLX8hklHnLgVhES2ZjhheDgC8pCRUCPqR4+KVnQcFRHP9MJCqcEIFAppD
oHITdiFDs/qE0EDV9WW1iOWgBmdgxUZ8dh1CfW+7B72+Uy0/eXWdnlrRDe5cN/hs
xXpnLCMfzSDEMA4WmImabpU/fRXL7pazZENJj7iyIAr/pEL34+QjqVfWaXkWrHoN
KsrkxTdoZNVdarBDSw9JtMUECmnWYOjMaOm1O8waib9H1SlPSSPrK5pGT/6h1g0d
LM982X36Ej8XyW33E5l6qWiLVRye7SaAvZbVLsyd+cfemi6BPsK+y09eCs4a+Qp7
9YWZOPT6s/ahJYdTGF961JZ62ypIioimW6wx8hAMCkKKfhn1WI0+0RlOrjbw
-----END CERTIFICATE-----

View File

@ -0,0 +1,19 @@
[req]
default_bits = 4096
default_md = sha256
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
C = US
ST = VA
L = SomeCity
O = MyCompany
OU = MyDivision
CN = 127.0.0.1
[v3_req]
keyUsage = keyEncipherment, dataEncipherment
extendedKeyUsage = serverAuth
subjectAltName = @alt_names
[alt_names]
IP.1 = 127.0.0.1

View File

@ -0,0 +1,4 @@
#!/bin/bash
openssl req -x509 -sha256 -nodes -days 36500 -newkey rsa:4096 -keyout key.key -out cert.pem -config config &&
openssl pkcs12 -export -out key.p12 -inkey key.key -in cert.pem -password pass:$(cat password.txt)

View File

@ -0,0 +1,52 @@
-----BEGIN PRIVATE KEY-----
MIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQDH9rVJGp6GP8PX
zXroZwmH/M7IO1kJt6gtZxV9LVoGr4nUUXLgtgpaDA2E5iyvEcuGwiv8ukelGb8U
Wos/EY2gQ8xwmLV0twQs9wkAbWX4HRSvnZX+fE6bOfvUyLRZC/Sg4XGE61Q5Ohje
F5VbQleCsguF0PzfiZZ2zEFY+GrUt4tlwy2HjCgyAP0AfgrPFQ20W+jyLwid4Ri+
gMhDx0ZnmUecVXmRoLwizSF68yWNPDzcT0EPQH9umn6jasHJxxW6WzL9Yrvre0W1
4PErl5lnf/mtEh366g3n4PihGcbrO6SLS3v55NmjVxQ+LbwggNM2M9Fe/Dfl235J
1MOO0j8VR/xVaEsgdlldp68WWK4Fty44ylYJ2MB3BvtR/1akyfHNm0p9q0AL12Td
U+x6EVx/rD8f5qHe27+zMBoMnnNlvSbTlwDENl6Rn1JxwEg+CVxwzd+/M2OsMiiL
jPu4KJMlzYVbSpvYzO6xCwaO0tbnefsgahIgUO86GJlr0P684TbAbqhU1/Yr39C/
A0MLNG9bzWZxQJGv8GFwCZfl2vBg/BQRx0FdB4U+3qdnfd+YiCXYJu+Kj2RQnOOV
BY5noFfkWYLr11YB51x8cZjr3e0botAqT1rkH42K4gHibVsdy+NiGyEapWvD3zRg
g3nZGYiiyQZF23fj+AodZyqMgQrT6QIDAQABAoICAGMICuZGmaXxJIPXDvzUMsM3
cA14XvNSEqdRuzHAaSqQexk8sUEaxuurtnJQMGcP0BVQSsqiUuMwahKheP7mKZbq
nPBSoONJ1HaUbc/ZXjvP4zPKPsPHOoLj55WNRMwpAKFApaDnj1G8NR6g3WZR59ch
aFWAmAv5LxxsshxnAzmQIShnzj+oKSwCk0pQIfhG+/+L2UVAB+tw1HlcfFIc+gBK
yE1jg46c5S/zGZaznrBg2d9eHOF51uKm/vrd31WYFGmzyv/0iw7ngTG/UpF9Rgsd
NUECjPh8PCDPqTLX+kz7v9UAsEiljye2856LtfT++BuK9DEvhlt/Jf9YsPUlqPl3
3wUG8yiqBQrlGTUY1KUdHsulmbTiq4Q9ch5QLcvazk+9c7hlB6WP+/ofqgIPSlDt
fOHkROmO7GURz78lVM8+E/pRgy6qDq+yM1uVMeWWme4hKfOAL2lnJDTO4PKNQA4b
03YXsdVSz4mm9ppnyHIPXei6/qHpU/cRRf261HNEI16eC0ZnoIAxhORJtxo6kMns
am4yuhHm9qLjbOI1uJPAgpR/o0O5NaBgkdEzJ102pmv2grf2U743n9bqu+y/vJF9
HRmMDdJgZSmcYxQuLe0INzLDnTzOdmjbqjB6lDsSwtrEo/KLtXIStrFMKSHIE/QV
96u8nWPomN83HqkVvQmBAoIBAQDrs8eKAQ3meWtmsSqlzCNVAsJA1xV4DtNaWBTz
MJXwRWywem/sHCoPsJ7c5UTUjQDOfNEUu8iW/m60dt0U+81/O9TLBP1Td6jxLg8X
92atLs8wHQDUqrgouce0lyS7to+R3K+N8YtWL2y9w9jbf/XT9iTL5TXGc8RFrmMg
nDQ1EShojU0U0I1lKpDJTx2R1FANfyd3iHSsENRwYj5MF8iQSag79Ek06BKLWHHt
OJj2oiO3VIAKQYVA9aKxfiiOWXWumPHq7r6UoNJK3UNzfBvguhEzl8k6VjZBCR9q
WwvSTba4mOgHMIXdV/9Wr3y8Cus2lX5YGOK4OUx/ZaCdaBtZAoIBAQDZLwwZDHen
Iw1412m/D/6HBS38bX78t+0hL7LNqgVpiZdNbLq57SGRbUnZZ/jlmtyLw3be6BV3
IcLyflYW+4Wi8AAqVADlXjMC+GIuDNCCicwWxJeIFaAGM7Jt6Fa08H/loIAMM7NC
y1CmQnCR9OnHRdcBaU1y4ForP4f8B/hwh3hSQEFPKgF/MQwDnR7UzPgRrUOTovN/
4D7j1Wx6FpYX9hGZL0i2K1ygRZE03t6VV7xhCkne96VvDEj1Zo/S4HFaEmDD+EjR
pvXVhPRed7GZ6AMs2JxOPhRiu3G+AQL1HPMDlA8QiPtTh0Zf99j/5NXKBEyH/fp1
V04L1s7wf7sRAoIBAQCb3/ftJ0dXDSNe9Xl7ziXrmXh3wwYasMtLawbn0VDHZlI7
36zW28VhPO/CrAi5/En1RIxNBubgHIF/7T/GGcRMCXhvjuwtX+wlG821jtKjY1p3
uiaLfh9uJ3aP0ojjbxdBYk3jNENuisyCLtviRZyAQb8R7JKEnJjHcE10CnloQuGT
SycXxdhMeDrqNt0aTOtoEZg7L83g4PxtGjuSvQPRkDSm+aXUTEm/R42IUS6vpIi0
PDi1D6GdVRT0BrexdC4kelc6hAsbZcPM6MkrvX7+Pm8TzKSyZMNafTr+bhnCScy2
BcEkyA0vVXuyizmVbi8hmPnGLyb4qEQT2FTA5FF5AoIBAQCEj0vCCjMKB8IUTN7V
aGzBeq7b0PVeSODqjZOEJk9RYFLCRigejZccjWky0lw/wGr2v6JRYbSgVzIHEod3
VaP2lKh1LXqyhPF70aETXGz0EClKiEm5HQHkZy90GAi8PcLCpFkjmXbDwRcDs6/D
1onOQFmAGgbUpA1FMmzMrwy7mmQdR+zU5d2uBYDAv+jumACdwXRqq14WYgfgxgaE
6j5Id7+8EPk/f230wSFk9NdErh1j2YTHG76U7hml9yi33JgzEt6PHn9Lv61y2sjQ
1BvJxawSdk/JDekhbil5gGKOu1G0kG01eXZ1QC77Kmr/nWvD9yXDJ4j0kAop/b2n
Wz8RAoIBAQDn1ZZGOJuVRUoql2A65zwtu34IrYD+2zQQCBf2hGHtwXT6ovqRFqPV
vcQ7KJP+zVT4GimFlZy7lUx8H4j7+/Bxn+PpUHHoDYjVURr12wk2w8pxwcKnbiIw
qaMkF5KG2IUVb7F8STEuKv4KKeuRlB4K2HC2J8GZOLXO21iOqNMhMRO11wp9jkKI
n83wtLH34lLRz4VzIW3rfvPeVoP1zoDkLvD8k/Oyjrf4Bishg9vCHyhQkB1JDtMU
1bfH8mxwKozakpJa23a8lE5NLoc9NOZrKM4+cefY1MZ3FjlaZfkS5jlhY4Qhx+fl
+9j5xRPaH+mkJHaJIqzQad+b1A2eIa+L
-----END PRIVATE KEY-----

Binary file not shown.

View File

@ -0,0 +1 @@
meow

View File

@ -68,3 +68,5 @@ itertools = "0.10.0"
monitoring_api = { path = "../common/monitoring_api" } monitoring_api = { path = "../common/monitoring_api" }
sensitive_url = { path = "../common/sensitive_url" } sensitive_url = { path = "../common/sensitive_url" }
task_executor = { path = "../common/task_executor" } task_executor = { path = "../common/task_executor" }
reqwest = { version = "0.11.0", features = ["json","stream"] }
url = "2.2.2"

View File

@ -5,6 +5,7 @@ use crate::{
validator_store::ValidatorStore, validator_store::ValidatorStore,
}; };
use environment::RuntimeContext; use environment::RuntimeContext;
use futures::future::join_all;
use slog::{crit, error, info, trace}; use slog::{crit, error, info, trace};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::collections::HashMap; use std::collections::HashMap;
@ -288,7 +289,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Then download, sign and publish a `SignedAggregateAndProof` for each // Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and // validator that is elected to aggregate for this `slot` and
// `committee_index`. // `committee_index`.
self.produce_and_publish_aggregates(attestation_data, &validator_duties) self.produce_and_publish_aggregates(&attestation_data, &validator_duties)
.await .await
.map_err(move |e| { .map_err(move |e| {
crit!( crit!(
@ -350,10 +351,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let mut attestations = Vec::with_capacity(validator_duties.len()); // Create futures to produce signed `Attestation` objects.
let attestation_data_ref = &attestation_data;
for duty_and_proof in validator_duties { let signing_futures = validator_duties.iter().map(|duty_and_proof| async move {
let duty = &duty_and_proof.duty; let duty = &duty_and_proof.duty;
let attestation_data = attestation_data_ref;
// Ensure that the attestation matches the duties. // Ensure that the attestation matches the duties.
#[allow(clippy::suspicious_operation_groupings)] #[allow(clippy::suspicious_operation_groupings)]
@ -368,7 +370,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
"duty_index" => duty.committee_index, "duty_index" => duty.committee_index,
"attestation_index" => attestation_data.index, "attestation_index" => attestation_data.index,
); );
continue; return None;
} }
let mut attestation = Attestation { let mut attestation = Attestation {
@ -377,26 +379,38 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
signature: AggregateSignature::infinity(), signature: AggregateSignature::infinity(),
}; };
if let Err(e) = self.validator_store.sign_attestation( match self
duty.pubkey, .validator_store
duty.validator_committee_index as usize, .sign_attestation(
&mut attestation, duty.pubkey,
current_epoch, duty.validator_committee_index as usize,
) { &mut attestation,
crit!( current_epoch,
log, )
"Failed to sign attestation"; .await
"error" => ?e, {
"committee_index" => committee_index, Ok(()) => Some(attestation),
"slot" => slot.as_u64(), Err(e) => {
); crit!(
continue; log,
} else { "Failed to sign attestation";
attestations.push(attestation); "error" => ?e,
"committee_index" => committee_index,
"slot" => slot.as_u64(),
);
None
}
} }
} });
let attestations_slice = attestations.as_slice(); // Execute all the futures in parallel, collecting any successful results.
let attestations = &join_all(signing_futures)
.await
.into_iter()
.flatten()
.collect::<Vec<Attestation<E>>>();
// Post the attestations to the BN.
match self match self
.beacon_nodes .beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move { .first_success(RequireSynced::No, |beacon_node| async move {
@ -405,7 +419,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&[metrics::ATTESTATIONS_HTTP_POST], &[metrics::ATTESTATIONS_HTTP_POST],
); );
beacon_node beacon_node
.post_beacon_pool_attestations(attestations_slice) .post_beacon_pool_attestations(attestations)
.await .await
}) })
.await .await
@ -447,13 +461,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
/// returned to the BN. /// returned to the BN.
async fn produce_and_publish_aggregates( async fn produce_and_publish_aggregates(
&self, &self,
attestation_data: AttestationData, attestation_data: &AttestationData,
validator_duties: &[DutyAndProof], validator_duties: &[DutyAndProof],
) -> Result<(), String> { ) -> Result<(), String> {
let log = self.context.log(); let log = self.context.log();
let attestation_data_ref = &attestation_data; let aggregated_attestation = &self
let aggregated_attestation = self
.beacon_nodes .beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move { .first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec( let _timer = metrics::start_timer_vec(
@ -462,55 +475,59 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
); );
beacon_node beacon_node
.get_validator_aggregate_attestation( .get_validator_aggregate_attestation(
attestation_data_ref.slot, attestation_data.slot,
attestation_data_ref.tree_hash_root(), attestation_data.tree_hash_root(),
) )
.await .await
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))? .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data_ref)) .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data) .map(|result| result.data)
}) })
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let mut signed_aggregate_and_proofs = Vec::new(); // Create futures to produce the signed aggregated attestations.
let signing_futures = validator_duties.iter().map(|duty_and_proof| async move {
for duty_and_proof in validator_duties {
let duty = &duty_and_proof.duty; let duty = &duty_and_proof.duty;
let selection_proof = duty_and_proof.selection_proof.as_ref()?;
let selection_proof = if let Some(proof) = duty_and_proof.selection_proof.as_ref() {
proof
} else {
// Do not produce a signed aggregate for validators that are not
// subscribed aggregators.
continue;
};
let slot = attestation_data.slot; let slot = attestation_data.slot;
let committee_index = attestation_data.index; let committee_index = attestation_data.index;
if duty.slot != slot || duty.committee_index != committee_index { if duty.slot != slot || duty.committee_index != committee_index {
crit!(log, "Inconsistent validator duties during signing"); crit!(log, "Inconsistent validator duties during signing");
continue; return None;
} }
match self.validator_store.produce_signed_aggregate_and_proof( match self
duty.pubkey, .validator_store
duty.validator_index, .produce_signed_aggregate_and_proof(
aggregated_attestation.clone(), duty.pubkey,
selection_proof.clone(), duty.validator_index,
) { aggregated_attestation.clone(),
Ok(aggregate) => signed_aggregate_and_proofs.push(aggregate), selection_proof.clone(),
)
.await
{
Ok(aggregate) => Some(aggregate),
Err(e) => { Err(e) => {
crit!( crit!(
log, log,
"Failed to sign attestation"; "Failed to sign attestation";
"error" => ?e "error" => ?e,
"pubkey" => ?duty.pubkey,
); );
continue; None
} }
} }
} });
// Execute all the futures in parallel, collecting any successful results.
let signed_aggregate_and_proofs = join_all(signing_futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
if !signed_aggregate_and_proofs.is_empty() { if !signed_aggregate_and_proofs.is_empty() {
let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice(); let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice();

View File

@ -240,6 +240,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let randao_reveal = self let randao_reveal = self
.validator_store .validator_store
.randao_reveal(validator_pubkey, slot.epoch(E::slots_per_epoch())) .randao_reveal(validator_pubkey, slot.epoch(E::slots_per_epoch()))
.await
.map_err(|e| format!("Unable to produce randao reveal signature: {:?}", e))? .map_err(|e| format!("Unable to produce randao reveal signature: {:?}", e))?
.into(); .into();
@ -276,6 +277,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let signed_block = self_ref let signed_block = self_ref
.validator_store .validator_store
.sign_block(*validator_pubkey_ref, block, current_slot) .sign_block(*validator_pubkey_ref, block, current_slot)
.await
.map_err(|e| format!("Unable to sign block: {:?}", e))?; .map_err(|e| format!("Unable to sign block: {:?}", e))?;
let _post_timer = metrics::start_timer_vec( let _post_timer = metrics::start_timer_vec(

View File

@ -16,6 +16,7 @@ use crate::{
}; };
use environment::RuntimeContext; use environment::RuntimeContext;
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId};
use futures::future::join_all;
use parking_lot::RwLock; use parking_lot::RwLock;
use safe_arith::ArithError; use safe_arith::ArithError;
use slog::{debug, error, info, warn, Logger}; use slog::{debug, error, info, warn, Logger};
@ -64,13 +65,14 @@ pub struct DutyAndProof {
impl DutyAndProof { impl DutyAndProof {
/// Instantiate `Self`, computing the selection proof as well. /// Instantiate `Self`, computing the selection proof as well.
pub fn new<T: SlotClock + 'static, E: EthSpec>( pub async fn new<T: SlotClock + 'static, E: EthSpec>(
duty: AttesterData, duty: AttesterData,
validator_store: &ValidatorStore<T, E>, validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let selection_proof = validator_store let selection_proof = validator_store
.produce_selection_proof(duty.pubkey, duty.slot) .produce_selection_proof(duty.pubkey, duty.slot)
.await
.map_err(Error::FailedToProduceSelectionProof)?; .map_err(Error::FailedToProduceSelectionProof)?;
let selection_proof = selection_proof let selection_proof = selection_proof
@ -637,56 +639,77 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
let dependent_root = response.dependent_root; let dependent_root = response.dependent_root;
let relevant_duties = response // Filter any duties that are not relevant or already known.
.data let new_duties = {
.into_iter() // Avoid holding the read-lock for any longer than required.
.filter(|attester_duty| local_pubkeys.contains(&attester_duty.pubkey)) let attesters = duties_service.attesters.read();
.collect::<Vec<_>>(); response
.data
.into_iter()
.filter(|duty| local_pubkeys.contains(&duty.pubkey))
.filter(|duty| {
// Only update the duties if either is true:
//
// - There were no known duties for this epoch.
// - The dependent root has changed, signalling a re-org.
attesters.get(&duty.pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
})
.collect::<Vec<_>>()
};
debug!( debug!(
log, log,
"Downloaded attester duties"; "Downloaded attester duties";
"dependent_root" => %dependent_root, "dependent_root" => %dependent_root,
"num_relevant_duties" => relevant_duties.len(), "num_new_duties" => new_duties.len(),
); );
// Produce the `DutyAndProof` messages in parallel.
let duty_and_proof_results = join_all(new_duties.into_iter().map(|duty| {
DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec)
}))
.await;
// Update the duties service with the new `DutyAndProof` messages.
let mut attesters = duties_service.attesters.write();
let mut already_warned = Some(()); let mut already_warned = Some(());
let mut attesters_map = duties_service.attesters.write(); for result in duty_and_proof_results {
for duty in relevant_duties { let duty_and_proof = match result {
let attesters_map = attesters_map.entry(duty.pubkey).or_default(); Ok(duty_and_proof) => duty_and_proof,
Err(e) => {
error!(
log,
"Failed to produce duty and proof";
"error" => ?e,
"msg" => "may impair attestation duties"
);
// Do not abort the entire batch for a single failure.
continue;
}
};
// Only update the duties if either is true: let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default();
//
// - There were no known duties for this epoch. if let Some((prior_dependent_root, _)) =
// - The dependent root has changed, signalling a re-org. attester_map.insert(epoch, (dependent_root, duty_and_proof))
if attesters_map
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
{ {
let duty_and_proof = // Using `already_warned` avoids excessive logs.
DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec)?; if dependent_root != prior_dependent_root && already_warned.take().is_some() {
warn!(
if let Some((prior_dependent_root, _)) = log,
attesters_map.insert(epoch, (dependent_root, duty_and_proof)) "Attester duties re-org";
{ "prior_dependent_root" => %prior_dependent_root,
// Using `already_warned` avoids excessive logs. "dependent_root" => %dependent_root,
if dependent_root != prior_dependent_root && already_warned.take().is_some() { "msg" => "this may happen from time to time"
warn!( )
log,
"Attester duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"msg" => "this may happen from time to time"
)
}
} }
} }
} }
// Drop the write-lock. drop(attesters);
//
// This is strictly unnecessary since the function ends immediately afterwards, but we remain
// defensive regardless.
drop(attesters_map);
Ok(()) Ok(())
} }

View File

@ -2,6 +2,7 @@ use crate::{
doppelganger_service::DoppelgangerStatus, doppelganger_service::DoppelgangerStatus,
duties_service::{DutiesService, Error}, duties_service::{DutiesService, Error},
}; };
use futures::future::join_all;
use itertools::Itertools; use itertools::Itertools;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slog::{crit, debug, info, warn}; use slog::{crit, debug, info, warn};
@ -330,8 +331,8 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
if !new_pre_compute_duties.is_empty() { if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone(); let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn_blocking( duties_service.context.executor.spawn(
move || { async move {
fill_in_aggregation_proofs( fill_in_aggregation_proofs(
sub_duties_service, sub_duties_service,
&new_pre_compute_duties, &new_pre_compute_duties,
@ -339,6 +340,7 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
current_epoch, current_epoch,
current_pre_compute_epoch, current_pre_compute_epoch,
) )
.await
}, },
"duties_service_sync_selection_proofs", "duties_service_sync_selection_proofs",
); );
@ -370,8 +372,8 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
if !new_pre_compute_duties.is_empty() { if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone(); let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn_blocking( duties_service.context.executor.spawn(
move || { async move {
fill_in_aggregation_proofs( fill_in_aggregation_proofs(
sub_duties_service, sub_duties_service,
&new_pre_compute_duties, &new_pre_compute_duties,
@ -379,6 +381,7 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
current_epoch, current_epoch,
pre_compute_epoch, pre_compute_epoch,
) )
.await
}, },
"duties_service_sync_selection_proofs", "duties_service_sync_selection_proofs",
); );
@ -468,7 +471,7 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
Ok(()) Ok(())
} }
pub fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>( pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>, duties_service: Arc<DutiesService<T, E>>,
pre_compute_duties: &[(Epoch, SyncDuty)], pre_compute_duties: &[(Epoch, SyncDuty)],
sync_committee_period: u64, sync_committee_period: u64,
@ -487,60 +490,54 @@ pub fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
// Generate selection proofs for each validator at each slot, one epoch at a time. // Generate selection proofs for each validator at each slot, one epoch at a time.
for epoch in (current_epoch.as_u64()..=pre_compute_epoch.as_u64()).map(Epoch::new) { for epoch in (current_epoch.as_u64()..=pre_compute_epoch.as_u64()).map(Epoch::new) {
// Generate proofs. let mut validator_proofs = vec![];
let validator_proofs: Vec<(u64, Vec<_>)> = pre_compute_duties for (validator_start_epoch, duty) in pre_compute_duties {
.iter() // Proofs are already known at this epoch for this validator.
.filter_map(|(validator_start_epoch, duty)| { if epoch < *validator_start_epoch {
// Proofs are already known at this epoch for this validator. continue;
if epoch < *validator_start_epoch { }
return None;
let subnet_ids = match duty.subnet_ids::<E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
log,
"Arithmetic error computing subnet IDs";
"error" => ?e,
);
continue;
} }
};
let subnet_ids = duty // Create futures to produce proofs.
.subnet_ids::<E>() let duties_service_ref = &duties_service;
.map_err(|e| { let futures = epoch
crit!( .slot_iter(E::slots_per_epoch())
log, .cartesian_product(&subnet_ids)
"Arithmetic error computing subnet IDs"; .map(|(duty_slot, subnet_id)| async move {
"error" => ?e, // Construct proof for prior slot.
); let slot = duty_slot - 1;
})
.ok()?;
let proofs = epoch let proof = match duties_service_ref
.slot_iter(E::slots_per_epoch()) .validator_store
.cartesian_product(&subnet_ids) .produce_sync_selection_proof(&duty.pubkey, slot, *subnet_id)
.filter_map(|(duty_slot, &subnet_id)| { .await
// Construct proof for prior slot. {
let slot = duty_slot - 1; Ok(proof) => proof,
Err(e) => {
warn!(
log,
"Unable to sign selection proof";
"error" => ?e,
"pubkey" => ?duty.pubkey,
"slot" => slot,
);
return None;
}
};
let proof = duties_service match proof.is_aggregator::<E>() {
.validator_store Ok(true) => {
.produce_sync_selection_proof(&duty.pubkey, slot, subnet_id)
.map_err(|_| {
warn!(
log,
"Pubkey missing when signing selection proof";
"pubkey" => ?duty.pubkey,
"slot" => slot,
);
})
.ok()?;
let is_aggregator = proof
.is_aggregator::<E>()
.map_err(|e| {
warn!(
log,
"Error determining is_aggregator";
"pubkey" => ?duty.pubkey,
"slot" => slot,
"error" => ?e,
);
})
.ok()?;
if is_aggregator {
debug!( debug!(
log, log,
"Validator is sync aggregator"; "Validator is sync aggregator";
@ -548,16 +545,31 @@ pub fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
"slot" => slot, "slot" => slot,
"subnet_id" => %subnet_id, "subnet_id" => %subnet_id,
); );
Some(((slot, subnet_id), proof)) Some(((slot, *subnet_id), proof))
} else { }
Ok(false) => None,
Err(e) => {
warn!(
log,
"Error determining is_aggregator";
"pubkey" => ?duty.pubkey,
"slot" => slot,
"error" => ?e,
);
None None
} }
}) }
.collect(); });
Some((duty.validator_index, proofs)) // Execute all the futures in parallel, collecting any successful results.
}) let proofs = join_all(futures)
.collect(); .await
.into_iter()
.flatten()
.collect::<Vec<_>>();
validator_proofs.push((duty.validator_index, proofs));
}
// Add to global storage (we add regularly so the proofs can be used ASAP). // Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read(); let sync_map = duties_service.sync_duties.committees.read();

View File

@ -1,4 +1,5 @@
use crate::ValidatorStore; use crate::ValidatorStore;
use account_utils::validator_definitions::{SigningDefinition, ValidatorDefinition};
use account_utils::{ use account_utils::{
eth2_wallet::{bip39::Mnemonic, WalletBuilder}, eth2_wallet::{bip39::Mnemonic, WalletBuilder},
random_mnemonic, random_password, ZeroizeString, random_mnemonic, random_password, ZeroizeString,
@ -21,7 +22,7 @@ use validator_dir::Builder as ValidatorDirBuilder;
/// ///
/// If `key_derivation_path_offset` is supplied then the EIP-2334 validator index will start at /// If `key_derivation_path_offset` is supplied then the EIP-2334 validator index will start at
/// this point. /// this point.
pub async fn create_validators<P: AsRef<Path>, T: 'static + SlotClock, E: EthSpec>( pub async fn create_validators_mnemonic<P: AsRef<Path>, T: 'static + SlotClock, E: EthSpec>(
mnemonic_opt: Option<Mnemonic>, mnemonic_opt: Option<Mnemonic>,
key_derivation_path_offset: Option<u32>, key_derivation_path_offset: Option<u32>,
validator_requests: &[api_types::ValidatorRequest], validator_requests: &[api_types::ValidatorRequest],
@ -159,3 +160,33 @@ pub async fn create_validators<P: AsRef<Path>, T: 'static + SlotClock, E: EthSpe
Ok((validators, mnemonic)) Ok((validators, mnemonic))
} }
pub async fn create_validators_web3signer<T: 'static + SlotClock, E: EthSpec>(
validator_requests: &[api_types::Web3SignerValidatorRequest],
validator_store: &ValidatorStore<T, E>,
) -> Result<(), warp::Rejection> {
for request in validator_requests {
let validator_definition = ValidatorDefinition {
enabled: request.enable,
voting_public_key: request.voting_public_key.clone(),
graffiti: request.graffiti.clone(),
description: request.description.clone(),
signing_definition: SigningDefinition::Web3Signer {
url: request.url.clone(),
root_certificate_path: request.root_certificate_path.clone(),
request_timeout_ms: request.request_timeout_ms,
},
};
validator_store
.add_validator(validator_definition)
.await
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to initialize validator: {:?}",
e
))
})?;
}
Ok(())
}

View File

@ -4,7 +4,7 @@ mod tests;
use crate::ValidatorStore; use crate::ValidatorStore;
use account_utils::mnemonic_from_phrase; use account_utils::mnemonic_from_phrase;
use create_validator::create_validators; use create_validator::{create_validators_mnemonic, create_validators_web3signer};
use eth2::lighthouse_vc::types::{self as api_types, PublicKey, PublicKeyBytes}; use eth2::lighthouse_vc::types::{self as api_types, PublicKey, PublicKeyBytes};
use lighthouse_version::version_with_platform; use lighthouse_version::version_with_platform;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -273,14 +273,15 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
runtime: Weak<Runtime>| { runtime: Weak<Runtime>| {
blocking_signed_json_task(signer, move || { blocking_signed_json_task(signer, move || {
if let Some(runtime) = runtime.upgrade() { if let Some(runtime) = runtime.upgrade() {
let (validators, mnemonic) = runtime.block_on(create_validators( let (validators, mnemonic) =
None, runtime.block_on(create_validators_mnemonic(
None, None,
&body, None,
&validator_dir, &body,
&validator_store, &validator_dir,
&spec, &validator_store,
))?; &spec,
))?;
let response = api_types::PostValidatorsResponseData { let response = api_types::PostValidatorsResponseData {
mnemonic: mnemonic.into_phrase().into(), mnemonic: mnemonic.into_phrase().into(),
validators, validators,
@ -322,14 +323,15 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
e e
)) ))
})?; })?;
let (validators, _mnemonic) = runtime.block_on(create_validators( let (validators, _mnemonic) =
Some(mnemonic), runtime.block_on(create_validators_mnemonic(
Some(body.key_derivation_path_offset), Some(mnemonic),
&body.validators, Some(body.key_derivation_path_offset),
&validator_dir, &body.validators,
&validator_store, &validator_dir,
&spec, &validator_store,
))?; &spec,
))?;
Ok(api_types::GenericResponse::from(validators)) Ok(api_types::GenericResponse::from(validators))
} else { } else {
Err(warp_utils::reject::custom_server_error( Err(warp_utils::reject::custom_server_error(
@ -416,6 +418,33 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
}, },
); );
// POST lighthouse/validators/web3signer
let post_validators_web3signer = warp::path("lighthouse")
.and(warp::path("validators"))
.and(warp::path("web3signer"))
.and(warp::path::end())
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(signer.clone())
.and(runtime_filter.clone())
.and_then(
|body: Vec<api_types::Web3SignerValidatorRequest>,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
runtime: Weak<Runtime>| {
blocking_signed_json_task(signer, move || {
if let Some(runtime) = runtime.upgrade() {
runtime.block_on(create_validators_web3signer(&body, &validator_store))?;
Ok(())
} else {
Err(warp_utils::reject::custom_server_error(
"Runtime shutdown".into(),
))
}
})
},
);
// PATCH lighthouse/validators/{validator_pubkey} // PATCH lighthouse/validators/{validator_pubkey}
let patch_validators = warp::path("lighthouse") let patch_validators = warp::path("lighthouse")
.and(warp::path("validators")) .and(warp::path("validators"))
@ -484,7 +513,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.or(warp::post().and( .or(warp::post().and(
post_validators post_validators
.or(post_validators_keystore) .or(post_validators_keystore)
.or(post_validators_mnemonic), .or(post_validators_mnemonic)
.or(post_validators_web3signer),
)) ))
.or(warp::patch().and(patch_validators)), .or(warp::patch().and(patch_validators)),
) )

View File

@ -4,7 +4,8 @@
use crate::doppelganger_service::DoppelgangerService; use crate::doppelganger_service::DoppelgangerService;
use crate::{ use crate::{
http_api::{ApiSecret, Config as HttpConfig, Context}, http_api::{ApiSecret, Config as HttpConfig, Context},
Config, InitializedValidators, ValidatorDefinitions, ValidatorStore, initialized_validators::InitializedValidators,
Config, ValidatorDefinitions, ValidatorStore,
}; };
use account_utils::{ use account_utils::{
eth2_wallet::WalletBuilder, mnemonic_from_phrase, random_mnemonic, random_password, eth2_wallet::WalletBuilder, mnemonic_from_phrase, random_mnemonic, random_password,
@ -27,6 +28,7 @@ use std::marker::PhantomData;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use task_executor::TaskExecutor;
use tempfile::{tempdir, TempDir}; use tempfile::{tempdir, TempDir};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -41,6 +43,7 @@ struct ApiTester {
url: SensitiveUrl, url: SensitiveUrl,
_server_shutdown: oneshot::Sender<()>, _server_shutdown: oneshot::Sender<()>,
_validator_dir: TempDir, _validator_dir: TempDir,
_runtime_shutdown: exit_future::Signal,
} }
// Builds a runtime to be used in the testing configuration. // Builds a runtime to be used in the testing configuration.
@ -85,6 +88,10 @@ impl ApiTester {
let slot_clock = let slot_clock =
TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1)); TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1));
let (runtime_shutdown, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = TaskExecutor::new(runtime.clone(), exit, log.clone(), shutdown_tx);
let validator_store = ValidatorStore::<_, E>::new( let validator_store = ValidatorStore::<_, E>::new(
initialized_validators, initialized_validators,
slashing_protection, slashing_protection,
@ -92,6 +99,7 @@ impl ApiTester {
spec, spec,
Some(Arc::new(DoppelgangerService::new(log.clone()))), Some(Arc::new(DoppelgangerService::new(log.clone()))),
slot_clock, slot_clock,
executor,
log.clone(), log.clone(),
); );
@ -141,6 +149,7 @@ impl ApiTester {
client, client,
url, url,
_server_shutdown: shutdown_tx, _server_shutdown: shutdown_tx,
_runtime_shutdown: runtime_shutdown,
} }
} }
@ -425,6 +434,40 @@ impl ApiTester {
self self
} }
pub async fn create_web3signer_validators(self, s: Web3SignerValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let request: Vec<_> = (0..s.count)
.map(|i| {
let kp = Keypair::random();
Web3SignerValidatorRequest {
enable: s.enabled,
description: format!("{}", i),
graffiti: None,
voting_public_key: kp.pk,
url: format!("http://signer_{}.com/", i),
root_certificate_path: None,
request_timeout_ms: None,
}
})
.collect();
self.client
.post_lighthouse_validators_web3signer(&request)
.await
.unwrap_err();
assert_eq!(self.vals_total(), initial_vals + s.count);
if s.enabled {
assert_eq!(self.vals_enabled(), initial_enabled_vals + s.count);
} else {
assert_eq!(self.vals_enabled(), initial_enabled_vals);
};
self
}
pub async fn set_validator_enabled(self, index: usize, enabled: bool) -> Self { pub async fn set_validator_enabled(self, index: usize, enabled: bool) -> Self {
let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index]; let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index];
@ -480,6 +523,11 @@ struct KeystoreValidatorScenario {
correct_password: bool, correct_password: bool,
} }
struct Web3SignerValidatorScenario {
count: usize,
enabled: bool,
}
#[test] #[test]
fn invalid_pubkey() { fn invalid_pubkey() {
let runtime = build_runtime(); let runtime = build_runtime();
@ -677,3 +725,22 @@ fn keystore_validator_creation() {
.assert_validators_count(2); .assert_validators_count(2);
}); });
} }
#[test]
fn web3signer_validator_creation() {
let runtime = build_runtime();
let weak_runtime = Arc::downgrade(&runtime);
runtime.block_on(async {
ApiTester::new(weak_runtime)
.await
.assert_enabled_validators_count(0)
.assert_validators_count(0)
.create_web3signer_validators(Web3SignerValidatorScenario {
count: 1,
enabled: true,
})
.await
.assert_enabled_validators_count(1)
.assert_validators_count(1);
});
}

View File

@ -30,6 +30,8 @@ pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get";
pub const SUBSCRIPTIONS_HTTP_POST: &str = "subscriptions_http_post"; pub const SUBSCRIPTIONS_HTTP_POST: &str = "subscriptions_http_post";
pub const UPDATE_PROPOSERS: &str = "update_proposers"; pub const UPDATE_PROPOSERS: &str = "update_proposers";
pub const SUBSCRIPTIONS: &str = "subscriptions"; pub const SUBSCRIPTIONS: &str = "subscriptions";
pub const LOCAL_KEYSTORE: &str = "local_keystore";
pub const WEB3SIGNER: &str = "web3signer";
pub use lighthouse_metrics::*; pub use lighthouse_metrics::*;
@ -138,6 +140,14 @@ lazy_static::lazy_static! {
"sync_eth2_fallback_connected", "sync_eth2_fallback_connected",
"Set to 1 if connected to atleast one synced eth2 fallback node, otherwise set to 0", "Set to 1 if connected to atleast one synced eth2 fallback node, otherwise set to 0",
); );
/*
* Signing Metrics
*/
pub static ref SIGNING_TIMES: Result<HistogramVec> = try_create_histogram_vec(
"vc_signing_times_seconds",
"Duration to obtain a signature",
&["type"]
);
} }
pub fn gather_prometheus_metrics<T: EthSpec>( pub fn gather_prometheus_metrics<T: EthSpec>(

View File

@ -6,6 +6,7 @@
//! The `InitializedValidators` struct in this file serves as the source-of-truth of which //! The `InitializedValidators` struct in this file serves as the source-of-truth of which
//! validators are managed by this validator client. //! validators are managed by this validator client.
use crate::signing_method::SigningMethod;
use account_utils::{ use account_utils::{
read_password, read_password_from_user, read_password, read_password_from_user,
validator_definitions::{ validator_definitions::{
@ -16,16 +17,26 @@ use account_utils::{
use eth2_keystore::Keystore; use eth2_keystore::Keystore;
use lighthouse_metrics::set_gauge; use lighthouse_metrics::set_gauge;
use lockfile::{Lockfile, LockfileError}; use lockfile::{Lockfile, LockfileError};
use reqwest::{Certificate, Client, Error as ReqwestError};
use slog::{debug, error, info, warn, Logger}; use slog::{debug, error, info, warn, Logger};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fs::File; use std::fs::File;
use std::io; use std::io::{self, Read};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use types::{Graffiti, Keypair, PublicKey, PublicKeyBytes}; use types::{Graffiti, Keypair, PublicKey, PublicKeyBytes};
use url::{ParseError, Url};
use crate::key_cache; use crate::key_cache;
use crate::key_cache::KeyCache; use crate::key_cache::KeyCache;
/// Default timeout for a request to a remote signer for a signature.
///
/// Set to 12 seconds since that's the duration of a slot. A remote signer that cannot sign within
/// that time is outside the synchronous assumptions of Eth2.
const DEFAULT_REMOTE_SIGNER_REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
// Use TTY instead of stdin to capture passwords from users. // Use TTY instead of stdin to capture passwords from users.
const USE_STDIN: bool = false; const USE_STDIN: bool = false;
@ -66,6 +77,12 @@ pub enum Error {
ValidatorNotInitialized(PublicKey), ValidatorNotInitialized(PublicKey),
/// Unable to read the slot clock. /// Unable to read the slot clock.
SlotClock, SlotClock,
/// The URL for the remote signer cannot be parsed.
InvalidWeb3SignerUrl(String),
/// Unable to read the root certificate file for the remote signer.
InvalidWeb3SignerRootCertificateFile(io::Error),
InvalidWeb3SignerRootCertificate(ReqwestError),
UnableToBuildWeb3SignerClient(ReqwestError),
} }
impl From<LockfileError> for Error { impl From<LockfileError> for Error {
@ -74,23 +91,9 @@ impl From<LockfileError> for Error {
} }
} }
/// A method used by a validator to sign messages.
///
/// Presently there is only a single variant, however we expect more variants to arise (e.g.,
/// remote signing).
pub enum SigningMethod {
/// A validator that is defined by an EIP-2335 keystore on the local filesystem.
LocalKeystore {
voting_keystore_path: PathBuf,
voting_keystore_lockfile: Lockfile,
voting_keystore: Keystore,
voting_keypair: Keypair,
},
}
/// A validator that is ready to sign messages. /// A validator that is ready to sign messages.
pub struct InitializedValidator { pub struct InitializedValidator {
signing_method: SigningMethod, signing_method: Arc<SigningMethod>,
graffiti: Option<Graffiti>, graffiti: Option<Graffiti>,
/// The validators index in `state.validators`, to be updated by an external service. /// The validators index in `state.validators`, to be updated by an external service.
index: Option<u64>, index: Option<u64>,
@ -99,11 +102,13 @@ pub struct InitializedValidator {
impl InitializedValidator { impl InitializedValidator {
/// Return a reference to this validator's lockfile if it has one. /// Return a reference to this validator's lockfile if it has one.
pub fn keystore_lockfile(&self) -> Option<&Lockfile> { pub fn keystore_lockfile(&self) -> Option<&Lockfile> {
match self.signing_method { match self.signing_method.as_ref() {
SigningMethod::LocalKeystore { SigningMethod::LocalKeystore {
ref voting_keystore_lockfile, ref voting_keystore_lockfile,
.. ..
} => Some(voting_keystore_lockfile), } => Some(voting_keystore_lockfile),
// Web3Signer validators do not have any lockfiles.
SigningMethod::Web3Signer { .. } => None,
} }
} }
} }
@ -138,7 +143,7 @@ impl InitializedValidator {
return Err(Error::UnableToInitializeDisabledValidator); return Err(Error::UnableToInitializeDisabledValidator);
} }
match def.signing_definition { let signing_method = match def.signing_definition {
// Load the keystore, password, decrypt the keypair and create a lockfile for a // Load the keystore, password, decrypt the keypair and create a lockfile for a
// EIP-2335 keystore on the local filesystem. // EIP-2335 keystore on the local filesystem.
SigningDefinition::LocalKeystore { SigningDefinition::LocalKeystore {
@ -210,33 +215,77 @@ impl InitializedValidator {
let voting_keystore_lockfile = Lockfile::new(lockfile_path)?; let voting_keystore_lockfile = Lockfile::new(lockfile_path)?;
Ok(Self { SigningMethod::LocalKeystore {
signing_method: SigningMethod::LocalKeystore { voting_keystore_path,
voting_keystore_path, voting_keystore_lockfile,
voting_keystore_lockfile, voting_keystore: voting_keystore.clone(),
voting_keystore: voting_keystore.clone(), voting_keypair: Arc::new(voting_keypair),
voting_keypair, }
},
graffiti: def.graffiti.map(Into::into),
index: None,
})
} }
} SigningDefinition::Web3Signer {
url,
root_certificate_path,
request_timeout_ms,
} => {
let signing_url = build_web3_signer_url(&url, &def.voting_public_key)
.map_err(|e| Error::InvalidWeb3SignerUrl(e.to_string()))?;
let request_timeout = request_timeout_ms
.map(Duration::from_millis)
.unwrap_or(DEFAULT_REMOTE_SIGNER_REQUEST_TIMEOUT);
let builder = Client::builder().timeout(request_timeout);
let builder = if let Some(path) = root_certificate_path {
let certificate = load_pem_certificate(path)?;
builder.add_root_certificate(certificate)
} else {
builder
};
let http_client = builder
.build()
.map_err(Error::UnableToBuildWeb3SignerClient)?;
SigningMethod::Web3Signer {
signing_url,
http_client,
voting_public_key: def.voting_public_key,
}
}
};
Ok(Self {
signing_method: Arc::new(signing_method),
graffiti: def.graffiti.map(Into::into),
index: None,
})
} }
/// Returns the voting public key for this validator. /// Returns the voting public key for this validator.
pub fn voting_public_key(&self) -> &PublicKey { pub fn voting_public_key(&self) -> &PublicKey {
match &self.signing_method { match self.signing_method.as_ref() {
SigningMethod::LocalKeystore { voting_keypair, .. } => &voting_keypair.pk, SigningMethod::LocalKeystore { voting_keypair, .. } => &voting_keypair.pk,
SigningMethod::Web3Signer {
voting_public_key, ..
} => voting_public_key,
} }
} }
}
/// Returns the voting keypair for this validator. pub fn load_pem_certificate<P: AsRef<Path>>(pem_path: P) -> Result<Certificate, Error> {
pub fn voting_keypair(&self) -> &Keypair { let mut buf = Vec::new();
match &self.signing_method { File::open(&pem_path)
SigningMethod::LocalKeystore { voting_keypair, .. } => voting_keypair, .map_err(Error::InvalidWeb3SignerRootCertificateFile)?
} .read_to_end(&mut buf)
} .map_err(Error::InvalidWeb3SignerRootCertificateFile)?;
Certificate::from_pem(&buf).map_err(Error::InvalidWeb3SignerRootCertificate)
}
fn build_web3_signer_url(base_url: &str, voting_public_key: &PublicKey) -> Result<Url, ParseError> {
Url::parse(base_url)?.join(&format!(
"api/v1/eth2/sign/{}",
voting_public_key.to_string()
))
} }
/// Try to unlock `keystore` at `keystore_path` by prompting the user via `stdin`. /// Try to unlock `keystore` at `keystore_path` by prompting the user via `stdin`.
@ -325,12 +374,14 @@ impl InitializedValidators {
self.validators.iter().map(|(pubkey, _)| pubkey) self.validators.iter().map(|(pubkey, _)| pubkey)
} }
/// Returns the voting `Keypair` for a given voting `PublicKey`, if that validator is known to /// Returns the voting `Keypair` for a given voting `PublicKey`, if all are true:
/// `self` **and** the validator is enabled. ///
pub fn voting_keypair(&self, voting_public_key: &PublicKeyBytes) -> Option<&Keypair> { /// - The validator is known to `self`.
/// - The validator is enabled.
pub fn signing_method(&self, voting_public_key: &PublicKeyBytes) -> Option<Arc<SigningMethod>> {
self.validators self.validators
.get(voting_public_key) .get(voting_public_key)
.map(|v| v.voting_keypair()) .map(|v| v.signing_method.clone())
} }
/// Add a validator definition to `self`, overwriting the on-disk representation of `self`. /// Add a validator definition to `self`, overwriting the on-disk representation of `self`.
@ -431,6 +482,8 @@ impl InitializedValidators {
}; };
definitions_map.insert(*key_store.uuid(), def); definitions_map.insert(*key_store.uuid(), def);
} }
// Remote signer validators don't interact with the key cache.
SigningDefinition::Web3Signer { .. } => (),
} }
} }
@ -451,13 +504,13 @@ impl InitializedValidators {
let mut public_keys = Vec::new(); let mut public_keys = Vec::new();
for uuid in cache.uuids() { for uuid in cache.uuids() {
let def = definitions_map.get(uuid).expect("Existence checked before"); let def = definitions_map.get(uuid).expect("Existence checked before");
let pw = match &def.signing_definition { match &def.signing_definition {
SigningDefinition::LocalKeystore { SigningDefinition::LocalKeystore {
voting_keystore_password_path, voting_keystore_password_path,
voting_keystore_password, voting_keystore_password,
voting_keystore_path, voting_keystore_path,
} => { } => {
if let Some(p) = voting_keystore_password { let pw = if let Some(p) = voting_keystore_password {
p.as_ref().to_vec().into() p.as_ref().to_vec().into()
} else if let Some(path) = voting_keystore_password_path { } else if let Some(path) = voting_keystore_password_path {
read_password(path).map_err(Error::UnableToReadVotingKeystorePassword)? read_password(path).map_err(Error::UnableToReadVotingKeystorePassword)?
@ -468,11 +521,13 @@ impl InitializedValidators {
.as_ref() .as_ref()
.to_vec() .to_vec()
.into() .into()
} };
passwords.push(pw);
public_keys.push(def.voting_public_key.clone());
} }
// Remote signer validators don't interact with the key cache.
SigningDefinition::Web3Signer { .. } => (),
}; };
passwords.push(pw);
public_keys.push(def.voting_public_key.clone());
} }
//decrypt //decrypt
@ -546,6 +601,7 @@ impl InitializedValidators {
info!( info!(
self.log, self.log,
"Enabled validator"; "Enabled validator";
"signing_method" => "local_keystore",
"voting_pubkey" => format!("{:?}", def.voting_public_key), "voting_pubkey" => format!("{:?}", def.voting_public_key),
); );
@ -565,6 +621,40 @@ impl InitializedValidators {
self.log, self.log,
"Failed to initialize validator"; "Failed to initialize validator";
"error" => format!("{:?}", e), "error" => format!("{:?}", e),
"signing_method" => "local_keystore",
"validator" => format!("{:?}", def.voting_public_key)
);
// Exit on an invalid validator.
return Err(e);
}
}
}
SigningDefinition::Web3Signer { .. } => {
match InitializedValidator::from_definition(
def.clone(),
&mut key_cache,
&mut key_stores,
)
.await
{
Ok(init) => {
self.validators
.insert(init.voting_public_key().compress(), init);
info!(
self.log,
"Enabled validator";
"signing_method" => "remote_signer",
"voting_pubkey" => format!("{:?}", def.voting_public_key),
);
}
Err(e) => {
error!(
self.log,
"Failed to initialize validator";
"error" => format!("{:?}", e),
"signing_method" => "remote_signer",
"validator" => format!("{:?}", def.voting_public_key) "validator" => format!("{:?}", def.voting_public_key)
); );
@ -585,6 +675,8 @@ impl InitializedValidators {
disabled_uuids.insert(*key_store.uuid()); disabled_uuids.insert(*key_store.uuid());
} }
} }
// Remote signers do not interact with the key cache.
SigningDefinition::Web3Signer { .. } => (),
} }
info!( info!(

View File

@ -7,19 +7,22 @@ mod config;
mod duties_service; mod duties_service;
mod graffiti_file; mod graffiti_file;
mod http_metrics; mod http_metrics;
mod initialized_validators;
mod key_cache; mod key_cache;
mod notifier; mod notifier;
mod signing_method;
mod sync_committee_service; mod sync_committee_service;
mod validator_store;
mod doppelganger_service; mod doppelganger_service;
pub mod http_api; pub mod http_api;
pub mod initialized_validators;
pub mod validator_store;
pub use cli::cli_app; pub use cli::cli_app;
pub use config::Config; pub use config::Config;
use initialized_validators::InitializedValidators;
use lighthouse_metrics::set_gauge; use lighthouse_metrics::set_gauge;
use monitoring_api::{MonitoringHttpClient, ProcessType}; use monitoring_api::{MonitoringHttpClient, ProcessType};
pub use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use crate::beacon_node_fallback::{ use crate::beacon_node_fallback::{
start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced, start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced,
@ -33,10 +36,8 @@ use duties_service::DutiesService;
use environment::RuntimeContext; use environment::RuntimeContext;
use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts}; use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts};
use http_api::ApiSecret; use http_api::ApiSecret;
use initialized_validators::InitializedValidators;
use notifier::spawn_notifier; use notifier::spawn_notifier;
use parking_lot::RwLock; use parking_lot::RwLock;
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use slog::{error, info, warn, Logger}; use slog::{error, info, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock; use slot_clock::SystemTimeSlotClock;
@ -332,6 +333,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
context.eth2_config.spec.clone(), context.eth2_config.spec.clone(),
doppelganger_service.clone(), doppelganger_service.clone(),
slot_clock.clone(), slot_clock.clone(),
context.executor.clone(),
log.clone(), log.clone(),
)); ));

View File

@ -0,0 +1,222 @@
//! Provides methods for obtaining validator signatures, including:
//!
//! - Via a local `Keypair`.
//! - Via a remote signer (Web3Signer)
use crate::http_metrics::metrics;
use eth2_keystore::Keystore;
use lockfile::Lockfile;
use reqwest::Client;
use std::path::PathBuf;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::*;
use url::Url;
use web3signer::{ForkInfo, SigningRequest, SigningResponse};
pub use web3signer::Web3SignerObject;
mod web3signer;
#[derive(Debug, PartialEq)]
pub enum Error {
InconsistentDomains {
message_type_domain: Domain,
domain: Domain,
},
Web3SignerRequestFailed(String),
Web3SignerJsonParsingFailed(String),
ShuttingDown,
TokioJoin(String),
}
/// Enumerates all messages that can be signed by a validator.
pub enum SignableMessage<'a, T: EthSpec> {
RandaoReveal(Epoch),
BeaconBlock(&'a BeaconBlock<T>),
AttestationData(&'a AttestationData),
SignedAggregateAndProof(&'a AggregateAndProof<T>),
SelectionProof(Slot),
SyncSelectionProof(&'a SyncAggregatorSelectionData),
SyncCommitteeSignature {
beacon_block_root: Hash256,
slot: Slot,
},
SignedContributionAndProof(&'a ContributionAndProof<T>),
}
impl<'a, T: EthSpec> SignableMessage<'a, T> {
/// Returns the `SignedRoot` for the contained message.
///
/// The actual `SignedRoot` trait is not used since it also requires a `TreeHash` impl, which is
/// not required here.
pub fn signing_root(&self, domain: Hash256) -> Hash256 {
match self {
SignableMessage::RandaoReveal(epoch) => epoch.signing_root(domain),
SignableMessage::BeaconBlock(b) => b.signing_root(domain),
SignableMessage::AttestationData(a) => a.signing_root(domain),
SignableMessage::SignedAggregateAndProof(a) => a.signing_root(domain),
SignableMessage::SelectionProof(slot) => slot.signing_root(domain),
SignableMessage::SyncSelectionProof(s) => s.signing_root(domain),
SignableMessage::SyncCommitteeSignature {
beacon_block_root, ..
} => beacon_block_root.signing_root(domain),
SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain),
}
}
}
/// A method used by a validator to sign messages.
///
/// Presently there is only a single variant, however we expect more variants to arise (e.g.,
/// remote signing).
pub enum SigningMethod {
/// A validator that is defined by an EIP-2335 keystore on the local filesystem.
LocalKeystore {
voting_keystore_path: PathBuf,
voting_keystore_lockfile: Lockfile,
voting_keystore: Keystore,
voting_keypair: Arc<Keypair>,
},
/// A validator that defers to a Web3Signer server for signing.
///
/// See: https://docs.web3signer.consensys.net/en/latest/
Web3Signer {
signing_url: Url,
http_client: Client,
voting_public_key: PublicKey,
},
}
/// The additional information used to construct a signature. Mostly used for protection from replay
/// attacks.
pub struct SigningContext {
pub domain: Domain,
pub epoch: Epoch,
pub fork: Fork,
pub genesis_validators_root: Hash256,
}
impl SigningContext {
/// Returns the `Hash256` to be mixed-in with the signature.
pub fn domain_hash(&self, spec: &ChainSpec) -> Hash256 {
spec.get_domain(
self.epoch,
self.domain,
&self.fork,
self.genesis_validators_root,
)
}
}
impl SigningMethod {
/// Return the signature of `signable_message`, with respect to the `signing_context`.
pub async fn get_signature<T: EthSpec>(
&self,
signable_message: SignableMessage<'_, T>,
signing_context: SigningContext,
spec: &ChainSpec,
executor: &TaskExecutor,
) -> Result<Signature, Error> {
let domain_hash = signing_context.domain_hash(spec);
let SigningContext {
fork,
genesis_validators_root,
..
} = signing_context;
let signing_root = signable_message.signing_root(domain_hash);
match self {
SigningMethod::LocalKeystore { voting_keypair, .. } => {
let _timer =
metrics::start_timer_vec(&metrics::SIGNING_TIMES, &[metrics::LOCAL_KEYSTORE]);
let voting_keypair = voting_keypair.clone();
// Spawn a blocking task to produce the signature. This avoids blocking the core
// tokio executor.
let signature = executor
.spawn_blocking_handle(
move || voting_keypair.sk.sign(signing_root),
"local_keystore_signer",
)
.ok_or(Error::ShuttingDown)?
.await
.map_err(|e| Error::TokioJoin(e.to_string()))?;
Ok(signature)
}
SigningMethod::Web3Signer {
signing_url,
http_client,
..
} => {
let _timer =
metrics::start_timer_vec(&metrics::SIGNING_TIMES, &[metrics::WEB3SIGNER]);
// Map the message into a Web3Signer type.
let object = match signable_message {
SignableMessage::RandaoReveal(epoch) => {
Web3SignerObject::RandaoReveal { epoch }
}
SignableMessage::BeaconBlock(block) => Web3SignerObject::beacon_block(block),
SignableMessage::AttestationData(a) => Web3SignerObject::Attestation(a),
SignableMessage::SignedAggregateAndProof(a) => {
Web3SignerObject::AggregateAndProof(a)
}
SignableMessage::SelectionProof(slot) => {
Web3SignerObject::AggregationSlot { slot }
}
SignableMessage::SyncSelectionProof(s) => {
Web3SignerObject::SyncAggregatorSelectionData(s)
}
SignableMessage::SyncCommitteeSignature {
beacon_block_root,
slot,
} => Web3SignerObject::SyncCommitteeMessage {
beacon_block_root,
slot,
},
SignableMessage::SignedContributionAndProof(c) => {
Web3SignerObject::ContributionAndProof(c)
}
};
// Determine the Web3Signer message type.
let message_type = object.message_type();
// The `fork_info` field is not required for deposits since they sign across the
// genesis fork version.
let fork_info = if let Web3SignerObject::Deposit { .. } = &object {
None
} else {
Some(ForkInfo {
fork,
genesis_validators_root,
})
};
let request = SigningRequest {
message_type,
fork_info,
signing_root,
object,
};
// Request a signature from the Web3Signer instance via HTTP(S).
let response: SigningResponse = http_client
.post(signing_url.clone())
.json(&request)
.send()
.await
.map_err(|e| Error::Web3SignerRequestFailed(e.to_string()))?
.error_for_status()
.map_err(|e| Error::Web3SignerRequestFailed(e.to_string()))?
.json()
.await
.map_err(|e| Error::Web3SignerJsonParsingFailed(e.to_string()))?;
Ok(response.signature)
}
}
}
}

View File

@ -0,0 +1,114 @@
//! Contains the types required to make JSON requests to Web3Signer servers.
use serde::{Deserialize, Serialize};
use types::*;
#[derive(Debug, PartialEq, Copy, Clone, Serialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum MessageType {
AggregationSlot,
AggregateAndProof,
Attestation,
BlockV2,
Deposit,
RandaoReveal,
VoluntaryExit,
SyncCommitteeMessage,
SyncCommitteeSelectionProof,
SyncCommitteeContributionAndProof,
}
#[derive(Debug, PartialEq, Copy, Clone, Serialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum ForkName {
Phase0,
Altair,
}
#[derive(Debug, PartialEq, Serialize)]
pub struct ForkInfo {
pub fork: Fork,
pub genesis_validators_root: Hash256,
}
#[derive(Debug, PartialEq, Serialize)]
#[serde(bound = "T: EthSpec", rename_all = "snake_case")]
pub enum Web3SignerObject<'a, T: EthSpec> {
AggregationSlot {
slot: Slot,
},
AggregateAndProof(&'a AggregateAndProof<T>),
Attestation(&'a AttestationData),
BeaconBlock {
version: ForkName,
block: &'a BeaconBlock<T>,
},
#[allow(dead_code)]
Deposit {
pubkey: PublicKeyBytes,
withdrawal_credentials: Hash256,
#[serde(with = "eth2_serde_utils::quoted_u64")]
amount: u64,
#[serde(with = "eth2_serde_utils::bytes_4_hex")]
genesis_fork_version: [u8; 4],
},
RandaoReveal {
epoch: Epoch,
},
#[allow(dead_code)]
VoluntaryExit(&'a VoluntaryExit),
SyncCommitteeMessage {
beacon_block_root: Hash256,
slot: Slot,
},
SyncAggregatorSelectionData(&'a SyncAggregatorSelectionData),
ContributionAndProof(&'a ContributionAndProof<T>),
}
impl<'a, T: EthSpec> Web3SignerObject<'a, T> {
pub fn beacon_block(block: &'a BeaconBlock<T>) -> Self {
let version = match block {
BeaconBlock::Base(_) => ForkName::Phase0,
BeaconBlock::Altair(_) => ForkName::Altair,
};
Web3SignerObject::BeaconBlock { version, block }
}
pub fn message_type(&self) -> MessageType {
match self {
Web3SignerObject::AggregationSlot { .. } => MessageType::AggregationSlot,
Web3SignerObject::AggregateAndProof(_) => MessageType::AggregateAndProof,
Web3SignerObject::Attestation(_) => MessageType::Attestation,
Web3SignerObject::BeaconBlock { .. } => MessageType::BlockV2,
Web3SignerObject::Deposit { .. } => MessageType::Deposit,
Web3SignerObject::RandaoReveal { .. } => MessageType::RandaoReveal,
Web3SignerObject::VoluntaryExit(_) => MessageType::VoluntaryExit,
Web3SignerObject::SyncCommitteeMessage { .. } => MessageType::SyncCommitteeMessage,
Web3SignerObject::SyncAggregatorSelectionData(_) => {
MessageType::SyncCommitteeSelectionProof
}
Web3SignerObject::ContributionAndProof(_) => {
MessageType::SyncCommitteeContributionAndProof
}
}
}
}
#[derive(Debug, PartialEq, Serialize)]
#[serde(bound = "T: EthSpec")]
pub struct SigningRequest<'a, T: EthSpec> {
#[serde(rename = "type")]
pub message_type: MessageType,
#[serde(skip_serializing_if = "Option::is_none")]
pub fork_info: Option<ForkInfo>,
#[serde(rename = "signingRoot")]
pub signing_root: Hash256,
#[serde(flatten)]
pub object: Web3SignerObject<'a, T>,
}
#[derive(Debug, PartialEq, Deserialize)]
pub struct SigningResponse {
pub signature: Signature,
}

View File

@ -2,6 +2,7 @@ use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
use environment::RuntimeContext; use environment::RuntimeContext;
use eth2::types::BlockId; use eth2::types::BlockId;
use futures::future::join_all;
use futures::future::FutureExt; use futures::future::FutureExt;
use slog::{crit, debug, error, info, trace, warn}; use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@ -182,23 +183,31 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// Spawn one task to publish all of the sync committee signatures. // Spawn one task to publish all of the sync committee signatures.
let validator_duties = slot_duties.duties; let validator_duties = slot_duties.duties;
let service = self.clone();
self.inner.context.executor.spawn( self.inner.context.executor.spawn(
self.clone() async move {
.publish_sync_committee_signatures(slot, block_root, validator_duties) service
.map(|_| ()), .publish_sync_committee_signatures(slot, block_root, validator_duties)
.map(|_| ())
.await
},
"sync_committee_signature_publish", "sync_committee_signature_publish",
); );
let aggregators = slot_duties.aggregators; let aggregators = slot_duties.aggregators;
let service = self.clone();
self.inner.context.executor.spawn( self.inner.context.executor.spawn(
self.clone() async move {
.publish_sync_committee_aggregates( service
slot, .publish_sync_committee_aggregates(
block_root, slot,
aggregators, block_root,
aggregate_production_instant, aggregators,
) aggregate_production_instant,
.map(|_| ()), )
.map(|_| ())
.await
},
"sync_committee_aggregate_publish", "sync_committee_aggregate_publish",
); );
@ -207,42 +216,50 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
/// Publish sync committee signatures. /// Publish sync committee signatures.
async fn publish_sync_committee_signatures( async fn publish_sync_committee_signatures(
self, &self,
slot: Slot, slot: Slot,
beacon_block_root: Hash256, beacon_block_root: Hash256,
validator_duties: Vec<SyncDuty>, validator_duties: Vec<SyncDuty>,
) -> Result<(), ()> { ) -> Result<(), ()> {
let log = self.context.log().clone(); let log = self.context.log();
let committee_signatures = validator_duties // Create futures to produce sync committee signatures.
.iter() let signature_futures = validator_duties.iter().map(|duty| async move {
.filter_map(|duty| { match self
self.validator_store .validator_store
.produce_sync_committee_signature( .produce_sync_committee_signature(
slot, slot,
beacon_block_root, beacon_block_root,
duty.validator_index, duty.validator_index,
&duty.pubkey, &duty.pubkey,
) )
.map_err(|e| { .await
crit!( {
log, Ok(signature) => Some(signature),
"Failed to sign sync committee signature"; Err(e) => {
"validator_index" => duty.validator_index, crit!(
"slot" => slot, log,
"error" => ?e, "Failed to sign sync committee signature";
); "validator_index" => duty.validator_index,
}) "slot" => slot,
.ok() "error" => ?e,
}) );
None
}
}
});
// Execute all the futures in parallel, collecting any successful results.
let committee_signatures = &join_all(signature_futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let signatures_slice = &committee_signatures;
self.beacon_nodes self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move { .first_success(RequireSynced::No, |beacon_node| async move {
beacon_node beacon_node
.post_beacon_pool_sync_committee_signatures(signatures_slice) .post_beacon_pool_sync_committee_signatures(committee_signatures)
.await .await
}) })
.await .await
@ -267,7 +284,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
} }
async fn publish_sync_committee_aggregates( async fn publish_sync_committee_aggregates(
self, &self,
slot: Slot, slot: Slot,
beacon_block_root: Hash256, beacon_block_root: Hash256,
aggregators: HashMap<SyncSubnetId, Vec<(u64, PublicKeyBytes, SyncSelectionProof)>>, aggregators: HashMap<SyncSubnetId, Vec<(u64, PublicKeyBytes, SyncSelectionProof)>>,
@ -276,22 +293,25 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
for (subnet_id, subnet_aggregators) in aggregators { for (subnet_id, subnet_aggregators) in aggregators {
let service = self.clone(); let service = self.clone();
self.inner.context.executor.spawn( self.inner.context.executor.spawn(
service async move {
.publish_sync_committee_aggregate_for_subnet( service
slot, .publish_sync_committee_aggregate_for_subnet(
beacon_block_root, slot,
subnet_id, beacon_block_root,
subnet_aggregators, subnet_id,
aggregate_instant, subnet_aggregators,
) aggregate_instant,
.map(|_| ()), )
.map(|_| ())
.await
},
"sync_committee_aggregate_publish_subnet", "sync_committee_aggregate_publish_subnet",
); );
} }
} }
async fn publish_sync_committee_aggregate_for_subnet( async fn publish_sync_committee_aggregate_for_subnet(
self, &self,
slot: Slot, slot: Slot,
beacon_block_root: Hash256, beacon_block_root: Hash256,
subnet_id: SyncSubnetId, subnet_id: SyncSubnetId,
@ -302,7 +322,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let log = self.context.log(); let log = self.context.log();
let contribution = self let contribution = &self
.beacon_nodes .beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move { .first_success(RequireSynced::No, |beacon_node| async move {
let sync_contribution_data = SyncContributionData { let sync_contribution_data = SyncContributionData {
@ -335,35 +355,45 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
})? })?
.data; .data;
// Make `SignedContributionAndProof`s // Create futures to produce signed contributions.
let signed_contributions = subnet_aggregators let signature_futures = subnet_aggregators.into_iter().map(
.into_iter() |(aggregator_index, aggregator_pk, selection_proof)| async move {
.filter_map(|(aggregator_index, aggregator_pk, selection_proof)| { match self
self.validator_store .validator_store
.produce_signed_contribution_and_proof( .produce_signed_contribution_and_proof(
aggregator_index, aggregator_index,
&aggregator_pk, aggregator_pk,
contribution.clone(), contribution.clone(),
selection_proof, selection_proof,
) )
.map_err(|e| { .await
{
Ok(signed_contribution) => Some(signed_contribution),
Err(e) => {
crit!( crit!(
log, log,
"Unable to sign sync committee contribution"; "Unable to sign sync committee contribution";
"slot" => slot, "slot" => slot,
"error" => ?e, "error" => ?e,
); );
}) None
.ok() }
}) }
},
);
// Execute all the futures in parallel, collecting any successful results.
let signed_contributions = &join_all(signature_futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Publish to the beacon node. // Publish to the beacon node.
let signed_contributions_slice = &signed_contributions;
self.beacon_nodes self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move { .first_success(RequireSynced::No, |beacon_node| async move {
beacon_node beacon_node
.post_validator_contribution_and_proofs(signed_contributions_slice) .post_validator_contribution_and_proofs(signed_contributions)
.await .await
}) })
.await .await

View File

@ -1,6 +1,8 @@
use crate::{ use crate::{
doppelganger_service::DoppelgangerService, http_metrics::metrics, doppelganger_service::DoppelgangerService,
http_metrics::metrics,
initialized_validators::InitializedValidators, initialized_validators::InitializedValidators,
signing_method::{Error as SigningError, SignableMessage, SigningContext, SigningMethod},
}; };
use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
@ -11,12 +13,13 @@ use std::iter::FromIterator;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use task_executor::TaskExecutor;
use types::{ use types::{
attestation::Error as AttestationError, graffiti::GraffitiString, Attestation, BeaconBlock, attestation::Error as AttestationError, graffiti::GraffitiString, AggregateAndProof,
ChainSpec, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, Attestation, BeaconBlock, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, Fork,
SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof,
SignedContributionAndProof, SignedRoot, Slot, SyncCommitteeContribution, SyncCommitteeMessage, SignedBeaconBlock, SignedContributionAndProof, Slot, SyncAggregatorSelectionData,
SyncSelectionProof, SyncSubnetId, SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId,
}; };
use validator_dir::ValidatorDir; use validator_dir::ValidatorDir;
@ -32,6 +35,13 @@ pub enum Error {
GreaterThanCurrentSlot { slot: Slot, current_slot: Slot }, GreaterThanCurrentSlot { slot: Slot, current_slot: Slot },
GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch }, GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch },
UnableToSignAttestation(AttestationError), UnableToSignAttestation(AttestationError),
UnableToSign(SigningError),
}
impl From<SigningError> for Error {
fn from(e: SigningError) -> Self {
Error::UnableToSign(e)
}
} }
/// Number of epochs of slashing protection history to keep. /// Number of epochs of slashing protection history to keep.
@ -73,10 +83,14 @@ pub struct ValidatorStore<T, E: EthSpec> {
log: Logger, log: Logger,
doppelganger_service: Option<Arc<DoppelgangerService>>, doppelganger_service: Option<Arc<DoppelgangerService>>,
slot_clock: T, slot_clock: T,
task_executor: TaskExecutor,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> { impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
// All arguments are different types. Making the fields `pub` is undesired. A builder seems
// unnecessary.
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
validators: InitializedValidators, validators: InitializedValidators,
slashing_protection: SlashingDatabase, slashing_protection: SlashingDatabase,
@ -84,6 +98,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
spec: ChainSpec, spec: ChainSpec,
doppelganger_service: Option<Arc<DoppelgangerService>>, doppelganger_service: Option<Arc<DoppelgangerService>>,
slot_clock: T, slot_clock: T,
task_executor: TaskExecutor,
log: Logger, log: Logger,
) -> Self { ) -> Self {
Self { Self {
@ -95,6 +110,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
log, log,
doppelganger_service, doppelganger_service,
slot_clock, slot_clock,
task_executor,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@ -124,12 +140,6 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
/// Insert a new validator to `self`, where the validator is represented by an EIP-2335 /// Insert a new validator to `self`, where the validator is represented by an EIP-2335
/// keystore on the filesystem. /// keystore on the filesystem.
///
/// This function includes:
///
/// - Add the validator definition to the YAML file, saving it to the filesystem.
/// - Enable validator with the slashing protection database.
/// - If `enable == true`, start performing duties for the validator.
pub async fn add_validator_keystore<P: AsRef<Path>>( pub async fn add_validator_keystore<P: AsRef<Path>>(
&self, &self,
voting_keystore_path: P, voting_keystore_path: P,
@ -144,14 +154,28 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
) )
.map_err(|e| format!("failed to create validator definitions: {:?}", e))?; .map_err(|e| format!("failed to create validator definitions: {:?}", e))?;
validator_def.enabled = enable;
self.add_validator(validator_def).await
}
/// Insert a new validator to `self`.
///
/// This function includes:
///
/// - Adding the validator definition to the YAML file, saving it to the filesystem.
/// - Enabling the validator with the slashing protection database.
/// - If `enable == true`, starting to perform duties for the validator.
pub async fn add_validator(
&self,
validator_def: ValidatorDefinition,
) -> Result<ValidatorDefinition, String> {
let validator_pubkey = validator_def.voting_public_key.compress(); let validator_pubkey = validator_def.voting_public_key.compress();
self.slashing_protection self.slashing_protection
.register_validator(validator_pubkey) .register_validator(validator_pubkey)
.map_err(|e| format!("failed to register validator: {:?}", e))?; .map_err(|e| format!("failed to register validator: {:?}", e))?;
validator_def.enabled = enable;
if let Some(doppelganger_service) = &self.doppelganger_service { if let Some(doppelganger_service) = &self.doppelganger_service {
doppelganger_service doppelganger_service
.register_new_validator::<E, _>(validator_pubkey, &self.slot_clock)?; .register_new_validator::<E, _>(validator_pubkey, &self.slot_clock)?;
@ -260,63 +284,72 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
self.spec.fork_at_epoch(epoch) self.spec.fork_at_epoch(epoch)
} }
/// Runs `func`, providing it access to the `Keypair` corresponding to `validator_pubkey`. /// Returns a `SigningMethod` for `validator_pubkey` *only if* that validator is considered safe
/// /// by doppelganger protection.
/// This forms the canonical point for accessing the secret key of some validator. It is fn doppelganger_checked_signing_method(
/// structured as a `with_...` function since we need to pass-through a read-lock in order to &self,
/// access the keypair. validator_pubkey: PublicKeyBytes,
/// ) -> Result<Arc<SigningMethod>, Error> {
/// Access to keypairs might be restricted by other internal mechanisms (e.g., doppleganger if self.doppelganger_protection_allows_signing(validator_pubkey) {
/// protection). self.validators
.read()
.signing_method(&validator_pubkey)
.ok_or(Error::UnknownPubkey(validator_pubkey))
} else {
Err(Error::DoppelgangerProtected(validator_pubkey))
}
}
/// Returns a `SigningMethod` for `validator_pubkey` regardless of that validators doppelganger
/// protection status.
/// ///
/// ## Warning /// ## Warning
/// ///
/// This function takes a read-lock on `self.validators`. To prevent deadlocks, it is advised to /// This method should only be used for signing non-slashable messages.
/// never take any sort of concurrency lock inside this function. fn doppelganger_bypassed_signing_method(
fn with_validator_keypair<F, R>(
&self, &self,
validator_pubkey: PublicKeyBytes, validator_pubkey: PublicKeyBytes,
func: F, ) -> Result<Arc<SigningMethod>, Error> {
) -> Result<R, Error> self.validators
where .read()
F: FnOnce(&Keypair) -> R, .signing_method(&validator_pubkey)
{ .ok_or(Error::UnknownPubkey(validator_pubkey))
// If the doppelganger service is active, check to ensure it explicitly permits signing by
// this validator.
if !self.doppelganger_protection_allows_signing(validator_pubkey) {
return Err(Error::DoppelgangerProtected(validator_pubkey));
}
let validators_lock = self.validators.read();
Ok(func(
validators_lock
.voting_keypair(&validator_pubkey)
.ok_or(Error::UnknownPubkey(validator_pubkey))?,
))
} }
pub fn randao_reveal( fn signing_context(&self, domain: Domain, signing_epoch: Epoch) -> SigningContext {
SigningContext {
domain,
epoch: signing_epoch,
fork: self.fork(signing_epoch),
genesis_validators_root: self.genesis_validators_root,
}
}
pub async fn randao_reveal(
&self, &self,
validator_pubkey: PublicKeyBytes, validator_pubkey: PublicKeyBytes,
epoch: Epoch, signing_epoch: Epoch,
) -> Result<Signature, Error> { ) -> Result<Signature, Error> {
let domain = self.spec.get_domain( let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;
epoch, let signing_context = self.signing_context(Domain::Randao, signing_epoch);
Domain::Randao,
&self.fork(epoch),
self.genesis_validators_root,
);
let message = epoch.signing_root(domain);
self.with_validator_keypair(validator_pubkey, |keypair| keypair.sk.sign(message)) let signature = signing_method
.get_signature::<E>(
SignableMessage::RandaoReveal(signing_epoch),
signing_context,
&self.spec,
&self.task_executor,
)
.await?;
Ok(signature)
} }
pub fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option<Graffiti> { pub fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option<Graffiti> {
self.validators.read().graffiti(validator_pubkey) self.validators.read().graffiti(validator_pubkey)
} }
pub fn sign_block( pub async fn sign_block(
&self, &self,
validator_pubkey: PublicKeyBytes, validator_pubkey: PublicKeyBytes,
block: BeaconBlock<E>, block: BeaconBlock<E>,
@ -336,19 +369,15 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
}); });
} }
// Check for slashing conditions. let signing_epoch = block.epoch();
let fork = self.fork(block.epoch()); let signing_context = self.signing_context(Domain::BeaconProposer, signing_epoch);
let domain = self.spec.get_domain( let domain_hash = signing_context.domain_hash(&self.spec);
block.epoch(),
Domain::BeaconProposer,
&fork,
self.genesis_validators_root,
);
// Check for slashing conditions.
let slashing_status = self.slashing_protection.check_and_insert_block_proposal( let slashing_status = self.slashing_protection.check_and_insert_block_proposal(
&validator_pubkey, &validator_pubkey,
&block.block_header(), &block.block_header(),
domain, domain_hash,
); );
match slashing_status { match slashing_status {
@ -356,9 +385,16 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
Ok(Safe::Valid) => { Ok(Safe::Valid) => {
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SUCCESS]); metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SUCCESS]);
self.with_validator_keypair(validator_pubkey, move |keypair| { let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;
block.sign(&keypair.sk, &fork, self.genesis_validators_root, &self.spec) let signature = signing_method
}) .get_signature(
SignableMessage::BeaconBlock(&block),
signing_context,
&self.spec,
&self.task_executor,
)
.await?;
Ok(SignedBeaconBlock::from_block(block, signature))
} }
Ok(Safe::SameData) => { Ok(Safe::SameData) => {
warn!( warn!(
@ -390,7 +426,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
} }
} }
pub fn sign_attestation( pub async fn sign_attestation(
&self, &self,
validator_pubkey: PublicKeyBytes, validator_pubkey: PublicKeyBytes,
validator_committee_position: usize, validator_committee_position: usize,
@ -406,33 +442,30 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
} }
// Checking for slashing conditions. // Checking for slashing conditions.
let fork = self.fork(attestation.data.target.epoch); let signing_epoch = attestation.data.target.epoch;
let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch);
let domain = self.spec.get_domain( let domain_hash = signing_context.domain_hash(&self.spec);
attestation.data.target.epoch,
Domain::BeaconAttester,
&fork,
self.genesis_validators_root,
);
let slashing_status = self.slashing_protection.check_and_insert_attestation( let slashing_status = self.slashing_protection.check_and_insert_attestation(
&validator_pubkey, &validator_pubkey,
&attestation.data, &attestation.data,
domain, domain_hash,
); );
match slashing_status { match slashing_status {
// We can safely sign this attestation. // We can safely sign this attestation.
Ok(Safe::Valid) => { Ok(Safe::Valid) => {
self.with_validator_keypair(validator_pubkey, |keypair| { let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;
attestation.sign( let signature = signing_method
&keypair.sk, .get_signature::<E>(
validator_committee_position, SignableMessage::AttestationData(&attestation.data),
&fork, signing_context,
self.genesis_validators_root,
&self.spec, &self.spec,
&self.task_executor,
) )
})? .await?;
.map_err(Error::UnableToSignAttestation)?; attestation
.add_signature(&signature, validator_committee_position)
.map_err(Error::UnableToSignAttestation)?;
metrics::inc_counter_vec(&metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SUCCESS]); metrics::inc_counter_vec(&metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SUCCESS]);
@ -482,149 +515,182 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
/// ///
/// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be
/// modified by actors other than the signing validator. /// modified by actors other than the signing validator.
pub fn produce_signed_aggregate_and_proof( pub async fn produce_signed_aggregate_and_proof(
&self, &self,
validator_pubkey: PublicKeyBytes, validator_pubkey: PublicKeyBytes,
validator_index: u64, aggregator_index: u64,
aggregate: Attestation<E>, aggregate: Attestation<E>,
selection_proof: SelectionProof, selection_proof: SelectionProof,
) -> Result<SignedAggregateAndProof<E>, Error> { ) -> Result<SignedAggregateAndProof<E>, Error> {
let fork = self.fork(aggregate.data.target.epoch); let signing_epoch = aggregate.data.target.epoch;
let signing_context = self.signing_context(Domain::AggregateAndProof, signing_epoch);
let proof = self.with_validator_keypair(validator_pubkey, move |keypair| { let message = AggregateAndProof {
SignedAggregateAndProof::from_aggregate( aggregator_index,
validator_index, aggregate,
aggregate, selection_proof: selection_proof.into(),
Some(selection_proof), };
&keypair.sk,
&fork, let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;
self.genesis_validators_root, let signature = signing_method
.get_signature(
SignableMessage::SignedAggregateAndProof(&message),
signing_context,
&self.spec, &self.spec,
&self.task_executor,
) )
})?; .await?;
metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]); metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]);
Ok(proof) Ok(SignedAggregateAndProof { message, signature })
} }
/// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to /// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to
/// `validator_pubkey`. /// `validator_pubkey`.
pub fn produce_selection_proof( pub async fn produce_selection_proof(
&self, &self,
validator_pubkey: PublicKeyBytes, validator_pubkey: PublicKeyBytes,
slot: Slot, slot: Slot,
) -> Result<SelectionProof, Error> { ) -> Result<SelectionProof, Error> {
// Bypass the `with_validator_keypair` function. let signing_epoch = slot.epoch(E::slots_per_epoch());
let signing_context = self.signing_context(Domain::SelectionProof, signing_epoch);
// Bypass the `with_validator_signing_method` function.
// //
// This is because we don't care about doppelganger protection when it comes to selection // This is because we don't care about doppelganger protection when it comes to selection
// proofs. They are not slashable and we need them to subscribe to subnets on the BN. // proofs. They are not slashable and we need them to subscribe to subnets on the BN.
// //
// As long as we disallow `SignedAggregateAndProof` then these selection proofs will never // As long as we disallow `SignedAggregateAndProof` then these selection proofs will never
// be published on the network. // be published on the network.
let validators_lock = self.validators.read(); let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?;
let keypair = validators_lock
.voting_keypair(&validator_pubkey)
.ok_or(Error::UnknownPubkey(validator_pubkey))?;
let proof = SelectionProof::new::<E>( let signature = signing_method
slot, .get_signature::<E>(
&keypair.sk, SignableMessage::SelectionProof(slot),
&self.fork(slot.epoch(E::slots_per_epoch())), signing_context,
self.genesis_validators_root, &self.spec,
&self.spec, &self.task_executor,
); )
.await
.map_err(Error::UnableToSign)?;
metrics::inc_counter_vec(&metrics::SIGNED_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS]); metrics::inc_counter_vec(&metrics::SIGNED_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS]);
Ok(proof) Ok(signature.into())
} }
/// Produce a `SyncSelectionProof` for `slot` signed by the secret key of `validator_pubkey`. /// Produce a `SyncSelectionProof` for `slot` signed by the secret key of `validator_pubkey`.
pub fn produce_sync_selection_proof( pub async fn produce_sync_selection_proof(
&self, &self,
validator_pubkey: &PublicKeyBytes, validator_pubkey: &PublicKeyBytes,
slot: Slot, slot: Slot,
subnet_id: SyncSubnetId, subnet_id: SyncSubnetId,
) -> Result<SyncSelectionProof, Error> { ) -> Result<SyncSelectionProof, Error> {
// Bypass `with_validator_keypair`: sync committee messages are not slashable. let signing_epoch = slot.epoch(E::slots_per_epoch());
let validators = self.validators.read(); let signing_context =
let voting_keypair = validators self.signing_context(Domain::SyncCommitteeSelectionProof, signing_epoch);
.voting_keypair(validator_pubkey)
.ok_or(Error::UnknownPubkey(*validator_pubkey))?; // Bypass `with_validator_signing_method`: sync committee messages are not slashable.
let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?;
metrics::inc_counter_vec( metrics::inc_counter_vec(
&metrics::SIGNED_SYNC_SELECTION_PROOFS_TOTAL, &metrics::SIGNED_SYNC_SELECTION_PROOFS_TOTAL,
&[metrics::SUCCESS], &[metrics::SUCCESS],
); );
Ok(SyncSelectionProof::new::<E>( let message = SyncAggregatorSelectionData {
slot, slot,
subnet_id.into(), subcommittee_index: subnet_id.into(),
&voting_keypair.sk, };
&self.fork(slot.epoch(E::slots_per_epoch())),
self.genesis_validators_root, let signature = signing_method
&self.spec, .get_signature::<E>(
)) SignableMessage::SyncSelectionProof(&message),
signing_context,
&self.spec,
&self.task_executor,
)
.await
.map_err(Error::UnableToSign)?;
Ok(signature.into())
} }
pub fn produce_sync_committee_signature( pub async fn produce_sync_committee_signature(
&self, &self,
slot: Slot, slot: Slot,
beacon_block_root: Hash256, beacon_block_root: Hash256,
validator_index: u64, validator_index: u64,
validator_pubkey: &PublicKeyBytes, validator_pubkey: &PublicKeyBytes,
) -> Result<SyncCommitteeMessage, Error> { ) -> Result<SyncCommitteeMessage, Error> {
// Bypass `with_validator_keypair`: sync committee messages are not slashable. let signing_epoch = slot.epoch(E::slots_per_epoch());
let validators = self.validators.read(); let signing_context = self.signing_context(Domain::SyncCommittee, signing_epoch);
let voting_keypair = validators
.voting_keypair(validator_pubkey) // Bypass `with_validator_signing_method`: sync committee messages are not slashable.
.ok_or(Error::UnknownPubkey(*validator_pubkey))?; let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?;
let signature = signing_method
.get_signature::<E>(
SignableMessage::SyncCommitteeSignature {
beacon_block_root,
slot,
},
signing_context,
&self.spec,
&self.task_executor,
)
.await
.map_err(Error::UnableToSign)?;
metrics::inc_counter_vec( metrics::inc_counter_vec(
&metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL, &metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL,
&[metrics::SUCCESS], &[metrics::SUCCESS],
); );
Ok(SyncCommitteeMessage::new::<E>( Ok(SyncCommitteeMessage {
slot, slot,
beacon_block_root, beacon_block_root,
validator_index, validator_index,
&voting_keypair.sk, signature,
&self.fork(slot.epoch(E::slots_per_epoch())), })
self.genesis_validators_root,
&self.spec,
))
} }
pub fn produce_signed_contribution_and_proof( pub async fn produce_signed_contribution_and_proof(
&self, &self,
aggregator_index: u64, aggregator_index: u64,
aggregator_pubkey: &PublicKeyBytes, aggregator_pubkey: PublicKeyBytes,
contribution: SyncCommitteeContribution<E>, contribution: SyncCommitteeContribution<E>,
selection_proof: SyncSelectionProof, selection_proof: SyncSelectionProof,
) -> Result<SignedContributionAndProof<E>, Error> { ) -> Result<SignedContributionAndProof<E>, Error> {
// Bypass `with_validator_keypair`: sync committee messages are not slashable. let signing_epoch = contribution.slot.epoch(E::slots_per_epoch());
let validators = self.validators.read(); let signing_context = self.signing_context(Domain::ContributionAndProof, signing_epoch);
let voting_keypair = validators
.voting_keypair(aggregator_pubkey) // Bypass `with_validator_signing_method`: sync committee messages are not slashable.
.ok_or(Error::UnknownPubkey(*aggregator_pubkey))?; let signing_method = self.doppelganger_bypassed_signing_method(aggregator_pubkey)?;
let fork = self.fork(contribution.slot.epoch(E::slots_per_epoch()));
let message = ContributionAndProof {
aggregator_index,
contribution,
selection_proof: selection_proof.into(),
};
let signature = signing_method
.get_signature(
SignableMessage::SignedContributionAndProof(&message),
signing_context,
&self.spec,
&self.task_executor,
)
.await
.map_err(Error::UnableToSign)?;
metrics::inc_counter_vec( metrics::inc_counter_vec(
&metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL, &metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL,
&[metrics::SUCCESS], &[metrics::SUCCESS],
); );
Ok(SignedContributionAndProof::from_aggregate( Ok(SignedContributionAndProof { message, signature })
aggregator_index,
contribution,
Some(selection_proof),
&voting_keypair.sk,
&fork,
self.genesis_validators_root,
&self.spec,
))
} }
/// Prune the slashing protection database so that it remains performant. /// Prune the slashing protection database so that it remains performant.