From c5c747651821cec8e37bda7ff65e12e45ec9d585 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 16 Sep 2021 03:26:33 +0000 Subject: [PATCH] Web3Signer support for VC (#2522) [EIP-3030]: https://eips.ethereum.org/EIPS/eip-3030 [Web3Signer]: https://consensys.github.io/web3signer/web3signer-eth2.html ## Issue Addressed Resolves #2498 ## Proposed Changes Allows the VC to call out to a [Web3Signer] remote signer to obtain signatures. ## Additional Info ### Making Signing Functions `async` To allow remote signing, I needed to make all the signing functions `async`. This caused a bit of noise where I had to convert iterators into `for` loops. In `duties_service.rs` there was a particularly tricky case where we couldn't hold a write-lock across an `await`, so I had to first take a read-lock, then grab a write-lock. ### Move Signing from Core Executor Whilst implementing this feature, I noticed that we signing was happening on the core tokio executor. I suspect this was causing the executor to temporarily lock and occasionally trigger some HTTP timeouts (and potentially SQL pool timeouts, but I can't verify this). Since moving all signing into blocking tokio tasks, I noticed a distinct drop in the "atttestations_http_get" metric on a Prater node: ![http_get_times](https://user-images.githubusercontent.com/6660660/132143737-82fd3836-2e7e-445b-a143-cb347783baad.png) I think this graph indicates that freeing the core executor allows the VC to operate more smoothly. ### Refactor TaskExecutor I noticed that the `TaskExecutor::spawn_blocking_handle` function would fail to spawn tasks if it were unable to obtain handles to some metrics (this can happen if the same metric is defined twice). It seemed that a more sensible approach would be to keep spawning tasks, but without metrics. To that end, I refactored the function so that it would still function without metrics. There are no other changes made. ## TODO - [x] Restructure to support multiple signing methods. - [x] Add calls to remote signer from VC. - [x] Documentation - [x] Test all endpoints - [x] Test HTTPS certificate - [x] Allow adding remote signer validators via the API - [x] Add Altair support via [21.8.1-rc1](https://github.com/ConsenSys/web3signer/releases/tag/21.8.1-rc1) - [x] Create issue to start using latest version of web3signer. (See #2570) ## Notes - ~~Web3Signer doesn't yet support the Altair fork for Prater. See https://github.com/ConsenSys/web3signer/issues/423.~~ - ~~There is not yet a release of Web3Signer which supports Altair blocks. See https://github.com/ConsenSys/web3signer/issues/391.~~ --- Cargo.lock | 27 + Cargo.toml | 1 + book/src/SUMMARY.md | 1 + book/src/api-vc-endpoints.md | 40 ++ book/src/validator-web3signer.md | 56 ++ .../src/validator_definitions.rs | 27 +- common/eth2/src/lighthouse_vc/http_client.rs | 16 + common/eth2/src/lighthouse_vc/types.rs | 18 + common/lighthouse_metrics/src/lib.rs | 12 + common/task_executor/src/lib.rs | 56 +- consensus/types/src/attestation.rs | 31 +- .../src/sync_aggregator_selection_data.rs | 1 + testing/web3signer_tests/Cargo.toml | 35 ++ testing/web3signer_tests/build.rs | 100 +++ testing/web3signer_tests/src/lib.rs | 589 ++++++++++++++++++ testing/web3signer_tests/tls/README.md | 6 + testing/web3signer_tests/tls/cert.pem | 32 + testing/web3signer_tests/tls/config | 19 + testing/web3signer_tests/tls/generate.sh | 4 + testing/web3signer_tests/tls/key.key | 52 ++ testing/web3signer_tests/tls/key.p12 | Bin 0 -> 4197 bytes testing/web3signer_tests/tls/password.txt | 1 + validator_client/Cargo.toml | 2 + validator_client/src/attestation_service.rs | 121 ++-- validator_client/src/block_service.rs | 2 + validator_client/src/duties_service.rs | 99 +-- validator_client/src/duties_service/sync.rs | 136 ++-- .../src/http_api/create_validator.rs | 33 +- validator_client/src/http_api/mod.rs | 66 +- validator_client/src/http_api/tests.rs | 69 +- validator_client/src/http_metrics/metrics.rs | 10 + .../src/initialized_validators.rs | 182 ++++-- validator_client/src/lib.rs | 10 +- validator_client/src/signing_method.rs | 222 +++++++ .../src/signing_method/web3signer.rs | 114 ++++ .../src/sync_committee_service.rs | 152 +++-- validator_client/src/validator_store.rs | 372 ++++++----- 37 files changed, 2236 insertions(+), 478 deletions(-) create mode 100644 book/src/validator-web3signer.md create mode 100644 testing/web3signer_tests/Cargo.toml create mode 100644 testing/web3signer_tests/build.rs create mode 100644 testing/web3signer_tests/src/lib.rs create mode 100644 testing/web3signer_tests/tls/README.md create mode 100644 testing/web3signer_tests/tls/cert.pem create mode 100644 testing/web3signer_tests/tls/config create mode 100755 testing/web3signer_tests/tls/generate.sh create mode 100644 testing/web3signer_tests/tls/key.key create mode 100644 testing/web3signer_tests/tls/key.p12 create mode 100644 testing/web3signer_tests/tls/password.txt create mode 100644 validator_client/src/signing_method.rs create mode 100644 validator_client/src/signing_method/web3signer.rs diff --git a/Cargo.lock b/Cargo.lock index ff7de2527..db05d9c9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6972,6 +6972,7 @@ dependencies = [ "parking_lot", "rand 0.7.3", "rayon", + "reqwest", "ring", "safe_arith", "scrypt", @@ -6990,6 +6991,7 @@ dependencies = [ "tokio", "tree_hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "types", + "url", "validator_dir", "warp", "warp_utils", @@ -7295,6 +7297,31 @@ dependencies = [ "url", ] +[[package]] +name = "web3signer_tests" +version = "0.1.0" +dependencies = [ + "account_utils", + "environment", + "eth2_keystore", + "eth2_network_config", + "exit-future", + "futures", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "serde_yaml", + "slot_clock", + "task_executor", + "tempfile", + "tokio", + "types", + "url", + "validator_client", + "zip", +] + [[package]] name = "webpki" version = "0.21.4" diff --git a/Cargo.toml b/Cargo.toml index 0be230ffd..19598f1ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ members = [ "testing/node_test_rig", "testing/simulator", "testing/state_transition_vectors", + "testing/web3signer_tests", "validator_client", "validator_client/slashing_protection", diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index 2bd27e1ef..7106f54e1 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -32,6 +32,7 @@ * [Advanced Usage](./advanced.md) * [Custom Data Directories](./advanced-datadir.md) * [Validator Graffiti](./graffiti.md) + * [Remote Signing with Web3Signer](./validator-web3signer.md) * [Database Configuration](./advanced_database.md) * [Advanced Networking](./advanced_networking.md) * [Running a Slasher](./slasher.md) diff --git a/book/src/api-vc-endpoints.md b/book/src/api-vc-endpoints.md index 58ad76ce5..16fd8ff8a 100644 --- a/book/src/api-vc-endpoints.md +++ b/book/src/api-vc-endpoints.md @@ -434,3 +434,43 @@ Typical Responses | 200 ] } ``` + +## `POST /lighthouse/validators/web3signer` + +Create any number of new validators, all of which will refer to a +[Web3Signer](https://docs.web3signer.consensys.net/en/latest/) server for signing. + +### HTTP Specification + +| Property | Specification | +| --- |--- | +Path | `/lighthouse/validators/web3signer` +Method | POST +Required Headers | [`Authorization`](./api-vc-auth-header.md) +Typical Responses | 200, 400 + +### Example Request Body + +```json +[ + { + "enable": true, + "description": "validator_one", + "graffiti": "Mr F was here", + "voting_public_key": "0xa062f95fee747144d5e511940624bc6546509eeaeae9383257a9c43e7ddc58c17c2bab4ae62053122184c381b90db380", + "url": "http://path-to-web3signer.com", + "root_certificate_path": "/path/on/vc/filesystem/to/certificate.pem", + "request_timeout_ms": 12000 + } +] +``` + +The following fields may be omitted or nullified to obtain default values: + +- `graffiti` +- `root_certificate_path` +- `request_timeout_ms` + +### Example Response Body + +*No data is included in the response body.* diff --git a/book/src/validator-web3signer.md b/book/src/validator-web3signer.md new file mode 100644 index 000000000..e65504095 --- /dev/null +++ b/book/src/validator-web3signer.md @@ -0,0 +1,56 @@ +# Remote Signing with Web3Signer + +[Web3Signer]: https://docs.web3signer.consensys.net/en/latest/ +[Consensys]: https://github.com/ConsenSys/ +[Teku]: https://github.com/consensys/teku + +[Web3Signer] is a tool by Consensys which allows *remote signing*. Remote signing is when a +Validator Client (VC) out-sources the signing of messages to remote server (e.g., via HTTPS). This +means that the VC does not hold the validator private keys. + +## Warnings + +Using a remote signer comes with risks, please read the following two warnings before proceeding: + +### Remote signing is complex and risky + +Remote signing is generally only desirable for enterprise users or users with unique security +requirements. Most users will find the separation between the Beacon Node (BN) and VC to be +sufficient *without* introducing a remote signer. + +**Using a remote signer introduces a new set of security and slashing risks and should only be +undertaken by advanced users who fully understand the risks.** + +### Web3Signer is not maintained by Lighthouse + +The [Web3Signer] tool is maintained by [Consensys], the same team that maintains [Teku]. The +Lighthouse team (Sigma Prime) does not maintain Web3Signer or make any guarantees about its safety +or effectiveness. + +## Usage + +A remote signing validator is added to Lighthouse in much the same way as one that uses a local +keystore, via the [`validator_definitions.yml`](./validator-management.md) file or via the `POST +/lighthouse/validators/web3signer` API endpoint. + +Here is an example of a `validator_definitions.yml` file containing one validator which uses a +remote signer: + +```yaml +--- +- enabled: true + voting_public_key: "0xa5566f9ec3c6e1fdf362634ebec9ef7aceb0e460e5079714808388e5d48f4ae1e12897fed1bea951c17fa389d511e477" + type: web3signer + url: "https://my-remote-signer.com:1234" + root_certificate_path: /home/paul/my-certificates/my-remote-signer.pem +``` + +When using this file, the Lighthouse VC will perform duties for the `0xa5566..` validator and defer +to the `https://my-remote-signer.com:1234` server to obtain any signatures. It will load a +"self-signed" SSL certificate from `/home/paul/my-certificates/my-remote-signer.pem` (on the +filesystem of the VC) to encrypt the communications between the VC and Web3Signer. + +> The `request_timeout_ms` key can also be specified. Use this key to override the default timeout +> with a new timeout in milliseconds. This is the timeout before requests to Web3Signer are +> considered to be failures. Setting a value that it too-long may create contention and late duties +> in the VC. Setting it too short will result in failed signatures and therefore missed duties. diff --git a/common/account_utils/src/validator_definitions.rs b/common/account_utils/src/validator_definitions.rs index 2aba4ff2e..418c0fb3c 100644 --- a/common/account_utils/src/validator_definitions.rs +++ b/common/account_utils/src/validator_definitions.rs @@ -61,6 +61,21 @@ pub enum SigningDefinition { #[serde(skip_serializing_if = "Option::is_none")] voting_keystore_password: Option, }, + /// A validator that defers to a Web3Signer HTTP server for signing. + /// + /// https://github.com/ConsenSys/web3signer + #[serde(rename = "web3signer")] + Web3Signer { + url: String, + /// Path to a .pem file. + #[serde(skip_serializing_if = "Option::is_none")] + root_certificate_path: Option, + /// Specifies a request timeout. + /// + /// The timeout is applied from when the request starts connecting until the response body has finished. + #[serde(skip_serializing_if = "Option::is_none")] + request_timeout_ms: Option, + }, } /// A validator that may be initialized by this validator client. @@ -116,6 +131,12 @@ impl ValidatorDefinition { #[derive(Default, Serialize, Deserialize)] pub struct ValidatorDefinitions(Vec); +impl From> for ValidatorDefinitions { + fn from(vec: Vec) -> Self { + Self(vec) + } +} + impl ValidatorDefinitions { /// Open an existing file or create a new, empty one if it does not exist. pub fn open_or_create>(validators_dir: P) -> Result { @@ -167,11 +188,13 @@ impl ValidatorDefinitions { let known_paths: HashSet<&PathBuf> = self .0 .iter() - .map(|def| match &def.signing_definition { + .filter_map(|def| match &def.signing_definition { SigningDefinition::LocalKeystore { voting_keystore_path, .. - } => voting_keystore_path, + } => Some(voting_keystore_path), + // A Web3Signer validator does not use a local keystore file. + SigningDefinition::Web3Signer { .. } => None, }) .collect(); diff --git a/common/eth2/src/lighthouse_vc/http_client.rs b/common/eth2/src/lighthouse_vc/http_client.rs index 16ecbf4af..cd640e615 100644 --- a/common/eth2/src/lighthouse_vc/http_client.rs +++ b/common/eth2/src/lighthouse_vc/http_client.rs @@ -313,6 +313,22 @@ impl ValidatorClientHttpClient { self.post(path, &request).await } + /// `POST lighthouse/validators/web3signer` + pub async fn post_lighthouse_validators_web3signer( + &self, + request: &[Web3SignerValidatorRequest], + ) -> Result, Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("validators") + .push("web3signer"); + + self.post(path, &request).await + } + /// `PATCH lighthouse/validators/{validator_pubkey}` pub async fn patch_lighthouse_validators( &self, diff --git a/common/eth2/src/lighthouse_vc/types.rs b/common/eth2/src/lighthouse_vc/types.rs index 05ab74ccf..9e311c9d6 100644 --- a/common/eth2/src/lighthouse_vc/types.rs +++ b/common/eth2/src/lighthouse_vc/types.rs @@ -2,6 +2,7 @@ use account_utils::ZeroizeString; use eth2_keystore::Keystore; use graffiti::GraffitiString; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; pub use crate::lighthouse::Health; pub use crate::types::{GenericResponse, VersionData}; @@ -64,3 +65,20 @@ pub struct KeystoreValidatorsPostRequest { pub keystore: Keystore, pub graffiti: Option, } + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Web3SignerValidatorRequest { + pub enable: bool, + pub description: String, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub graffiti: Option, + pub voting_public_key: PublicKey, + pub url: String, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub root_certificate_path: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub request_timeout_ms: Option, +} diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index ba499bc54..4b7160ae0 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -283,6 +283,18 @@ pub fn set_gauge_vec(int_gauge_vec: &Result, name: &[&str], value: } } +pub fn inc_gauge_vec(int_gauge_vec: &Result, name: &[&str]) { + if let Some(gauge) = get_int_gauge(int_gauge_vec, name) { + gauge.inc(); + } +} + +pub fn dec_gauge_vec(int_gauge_vec: &Result, name: &[&str]) { + if let Some(gauge) = get_int_gauge(int_gauge_vec, name) { + gauge.dec(); + } +} + pub fn set_gauge(gauge: &Result, value: i64) { if let Ok(gauge) = gauge { gauge.set(value); diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 4a509897e..abf8e7834 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -237,39 +237,33 @@ impl TaskExecutor { { let log = self.log.clone(); - if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) { - if let Some(int_gauge) = metrics::get_int_gauge(&metrics::BLOCKING_TASKS_COUNT, &[name]) - { - let int_gauge_1 = int_gauge; - let timer = metric.start_timer(); - let join_handle = if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn_blocking(task) - } else { - debug!(self.log, "Couldn't spawn task. Runtime shutting down"); - return None; - }; + let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]); + metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]); - Some(async move { - let result = match join_handle.await { - Ok(result) => { - trace!(log, "Blocking task completed"; "task" => name); - Ok(result) - } - Err(e) => { - debug!(log, "Blocking task ended unexpectedly"; "error" => %e); - Err(e) - } - }; - timer.observe_duration(); - int_gauge_1.dec(); - result - }) - } else { - None - } + let join_handle = if let Some(runtime) = self.runtime.upgrade() { + runtime.spawn_blocking(task) } else { - None - } + debug!(self.log, "Couldn't spawn task. Runtime shutting down"); + return None; + }; + + let future = async move { + let result = match join_handle.await { + Ok(result) => { + trace!(log, "Blocking task completed"; "task" => name); + Ok(result) + } + Err(e) => { + debug!(log, "Blocking task ended unexpectedly"; "error" => %e); + Err(e) + } + }; + drop(timer); + metrics::dec_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]); + result + }; + + Some(future) } pub fn runtime(&self) -> Weak { diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 17384857a..66d9e78a8 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -9,7 +9,7 @@ use crate::{test_utils::TestRandom, Hash256, Slot}; use super::{ AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, - SignedRoot, + Signature, SignedRoot, }; #[derive(Debug, PartialEq)] @@ -60,6 +60,25 @@ impl Attestation { fork: &Fork, genesis_validators_root: Hash256, spec: &ChainSpec, + ) -> Result<(), Error> { + let domain = spec.get_domain( + self.data.target.epoch, + Domain::BeaconAttester, + fork, + genesis_validators_root, + ); + let message = self.data.signing_root(domain); + + self.add_signature(&secret_key.sign(message), committee_position) + } + + /// Adds `signature` to `self` and sets the `committee_position`'th bit of `aggregation_bits` to `true`. + /// + /// Returns an `AlreadySigned` error if the `committee_position`'th bit is already `true`. + pub fn add_signature( + &mut self, + signature: &Signature, + committee_position: usize, ) -> Result<(), Error> { if self .aggregation_bits @@ -72,15 +91,7 @@ impl Attestation { .set(committee_position, true) .map_err(Error::SszTypesError)?; - let domain = spec.get_domain( - self.data.target.epoch, - Domain::BeaconAttester, - fork, - genesis_validators_root, - ); - let message = self.data.signing_root(domain); - - self.signature.add_assign(&secret_key.sign(message)); + self.signature.add_assign(signature); Ok(()) } diff --git a/consensus/types/src/sync_aggregator_selection_data.rs b/consensus/types/src/sync_aggregator_selection_data.rs index 94cdee3c3..963b9dc60 100644 --- a/consensus/types/src/sync_aggregator_selection_data.rs +++ b/consensus/types/src/sync_aggregator_selection_data.rs @@ -12,6 +12,7 @@ use tree_hash_derive::TreeHash; )] pub struct SyncAggregatorSelectionData { pub slot: Slot, + #[serde(with = "eth2_serde_utils::quoted_u64")] pub subcommittee_index: u64, } diff --git a/testing/web3signer_tests/Cargo.toml b/testing/web3signer_tests/Cargo.toml new file mode 100644 index 000000000..c5d901534 --- /dev/null +++ b/testing/web3signer_tests/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "web3signer_tests" +version = "0.1.0" +edition = "2018" + +build = "build.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] + +[dev-dependencies] +eth2_keystore = { path = "../../crypto/eth2_keystore" } +types = { path = "../../consensus/types" } +tempfile = "3.1.0" +tokio = { version = "1.10.0", features = ["rt-multi-thread", "macros"] } +reqwest = { version = "0.11.0", features = ["json","stream"] } +url = "2.2.2" +validator_client = { path = "../../validator_client" } +slot_clock = { path = "../../common/slot_clock" } +futures = "0.3.7" +exit-future = "0.2.0" +task_executor = { path = "../../common/task_executor" } +environment = { path = "../../lighthouse/environment" } +account_utils = { path = "../../common/account_utils" } +serde = "1.0.116" +serde_derive = "1.0.116" +serde_yaml = "0.8.13" +eth2_network_config = { path = "../../common/eth2_network_config" } + +[build-dependencies] +tokio = { version = "1.10.0", features = ["rt-multi-thread", "macros"] } +reqwest = { version = "0.11.0", features = ["json","stream"] } +serde_json = "1.0.58" +zip = "0.5.13" diff --git a/testing/web3signer_tests/build.rs b/testing/web3signer_tests/build.rs new file mode 100644 index 000000000..ac34b5197 --- /dev/null +++ b/testing/web3signer_tests/build.rs @@ -0,0 +1,100 @@ +//! This build script downloads the latest Web3Signer release and places it in the `OUT_DIR` so it +//! can be used for integration testing. + +use reqwest::Client; +use serde_json::Value; +use std::env; +use std::fs; +use std::path::PathBuf; +use zip::ZipArchive; + +/// Use `None` to download the latest Github release. +/// Use `Some("21.8.1")` to download a specific version. +const FIXED_VERSION_STRING: Option<&str> = None; + +#[tokio::main] +async fn main() { + let out_dir = env::var("OUT_DIR").unwrap(); + download_binary(out_dir.into()).await; +} + +pub async fn download_binary(dest_dir: PathBuf) { + let version_file = dest_dir.join("version"); + + let client = Client::builder() + // Github gives a 403 without a user agent. + .user_agent("web3signer_tests") + .build() + .unwrap(); + + let version = if let Some(version) = FIXED_VERSION_STRING { + version.to_string() + } else { + // Get the latest release of the web3 signer repo. + let latest_response: Value = client + .get("https://api.github.com/repos/ConsenSys/web3signer/releases/latest") + .send() + .await + .unwrap() + .error_for_status() + .unwrap() + .json() + .await + .unwrap(); + latest_response + .get("tag_name") + .unwrap() + .as_str() + .unwrap() + .to_string() + }; + + if version_file.exists() && fs::read(&version_file).unwrap() == version.as_bytes() { + // The latest version is already downloaded, do nothing. + return; + } else { + // Ignore the result since we don't care if the version file already exists. + let _ = fs::remove_file(&version_file); + } + + // Download the latest release zip. + let zip_url = format!("https://artifacts.consensys.net/public/web3signer/raw/names/web3signer.zip/versions/{}/web3signer-{}.zip", version, version); + let zip_response = client + .get(zip_url) + .send() + .await + .unwrap() + .error_for_status() + .unwrap() + .bytes() + .await + .unwrap(); + + // Write the zip to a file. + let zip_path = dest_dir.join(format!("{}.zip", version)); + fs::write(&zip_path, zip_response).unwrap(); + // Unzip the zip. + let mut zip_file = fs::File::open(&zip_path).unwrap(); + ZipArchive::new(&mut zip_file) + .unwrap() + .extract(&dest_dir) + .unwrap(); + + // Rename the web3signer directory so it doesn't include the version string. This ensures the + // path to the binary is predictable. + let web3signer_dir = dest_dir.join("web3signer"); + if web3signer_dir.exists() { + fs::remove_dir_all(&web3signer_dir).unwrap(); + } + fs::rename( + dest_dir.join(format!("web3signer-{}", version)), + web3signer_dir, + ) + .unwrap(); + + // Delete zip and unzipped dir. + fs::remove_file(&zip_path).unwrap(); + + // Update the version file to avoid duplicate downloads. + fs::write(&version_file, version).unwrap(); +} diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs new file mode 100644 index 000000000..12f06f9de --- /dev/null +++ b/testing/web3signer_tests/src/lib.rs @@ -0,0 +1,589 @@ +//! This crate provides a series of integration tests between the Lighthouse `ValidatorStore` and +//! Web3Signer by Consensys. +//! +//! These tests aim to ensure that: +//! +//! - Lighthouse can issue valid requests to Web3Signer. +//! - The signatures generated by Web3Signer are identical to those which Lighthouse generates. +//! +//! There is a build script in this crate which obtains the latest version of Web3Signer and makes +//! it available via the `OUT_DIR`. + +#[cfg(all(test, unix, not(debug_assertions)))] +mod tests { + use account_utils::validator_definitions::{ + SigningDefinition, ValidatorDefinition, ValidatorDefinitions, + }; + use eth2_keystore::KeystoreBuilder; + use eth2_network_config::Eth2NetworkConfig; + use reqwest::Client; + use serde::Serialize; + use slot_clock::{SlotClock, TestingSlotClock}; + use std::env; + use std::fmt::Debug; + use std::fs::{self, File}; + use std::future::Future; + use std::path::PathBuf; + use std::process::{Child, Command, Stdio}; + use std::sync::Arc; + use std::time::{Duration, Instant}; + use task_executor::TaskExecutor; + use tempfile::TempDir; + use tokio::time::sleep; + use types::*; + use url::Url; + use validator_client::{ + initialized_validators::{load_pem_certificate, InitializedValidators}, + validator_store::ValidatorStore, + SlashingDatabase, SLASHING_PROTECTION_FILENAME, + }; + + /// If the we are unable to reach the Web3Signer HTTP API within this time out then we will + /// assume it failed to start. + const UPCHECK_TIMEOUT: Duration = Duration::from_secs(20); + + /// Set to `true` to send the Web3Signer logs to the console during tests. Logs are useful when + /// debugging. + const SUPPRESS_WEB3SIGNER_LOGS: bool = true; + + type E = MainnetEthSpec; + + /// This marker trait is implemented for objects that we wish to compare to ensure Web3Signer + /// and Lighthouse agree on signatures. + /// + /// The purpose of this trait is to prevent accidentally comparing useless values like `()`. + trait SignedObject: PartialEq + Debug {} + + impl SignedObject for Signature {} + impl SignedObject for Attestation {} + impl SignedObject for SignedBeaconBlock {} + impl SignedObject for SignedAggregateAndProof {} + impl SignedObject for SelectionProof {} + impl SignedObject for SyncSelectionProof {} + impl SignedObject for SyncCommitteeMessage {} + impl SignedObject for SignedContributionAndProof {} + + /// A file format used by Web3Signer to discover and unlock keystores. + #[derive(Serialize)] + struct Web3SignerKeyConfig { + #[serde(rename = "type")] + config_type: String, + #[serde(rename = "keyType")] + key_type: String, + #[serde(rename = "keystoreFile")] + keystore_file: String, + #[serde(rename = "keystorePasswordFile")] + keystore_password_file: String, + } + + const KEYSTORE_PASSWORD: &str = "hi mum"; + const WEB3SIGNER_LISTEN_ADDRESS: &str = "127.0.0.1"; + + /// A deterministic, arbitrary keypair. + fn testing_keypair() -> Keypair { + // Just an arbitrary secret key. + let sk = SecretKey::deserialize(&[ + 85, 40, 245, 17, 84, 193, 234, 155, 24, 234, 181, 58, 171, 193, 209, 164, 120, 147, 10, + 174, 189, 228, 119, 48, 181, 19, 117, 223, 2, 240, 7, 108, + ]) + .unwrap(); + let pk = sk.public_key(); + Keypair::from_components(pk, sk) + } + + /// The location of the Web3Signer binary generated by the build script. + fn web3signer_binary() -> PathBuf { + PathBuf::from(env::var("OUT_DIR").unwrap()) + .join("web3signer") + .join("bin") + .join("web3signer") + } + + /// The location of a directory where we keep some files for testing TLS. + fn tls_dir() -> PathBuf { + PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap()).join("tls") + } + + fn root_certificate_path() -> PathBuf { + tls_dir().join("cert.pem") + } + + /// A testing rig which holds a live Web3Signer process. + struct Web3SignerRig { + keypair: Keypair, + _keystore_dir: TempDir, + keystore_path: PathBuf, + web3signer_child: Child, + http_client: Client, + url: Url, + } + + impl Drop for Web3SignerRig { + fn drop(&mut self) { + self.web3signer_child.kill().unwrap(); + } + } + + impl Web3SignerRig { + pub async fn new(network: &str, listen_address: &str, listen_port: u16) -> Self { + let keystore_dir = TempDir::new().unwrap(); + let keypair = testing_keypair(); + let keystore = + KeystoreBuilder::new(&keypair, KEYSTORE_PASSWORD.as_bytes(), "".to_string()) + .unwrap() + .build() + .unwrap(); + let keystore_filename = "keystore.json"; + let keystore_path = keystore_dir.path().join(keystore_filename); + let keystore_file = File::create(&keystore_path).unwrap(); + keystore.to_json_writer(&keystore_file).unwrap(); + + let keystore_password_filename = "password.txt"; + let keystore_password_path = keystore_dir.path().join(keystore_password_filename); + fs::write(&keystore_password_path, KEYSTORE_PASSWORD.as_bytes()).unwrap(); + + let key_config = Web3SignerKeyConfig { + config_type: "file-keystore".to_string(), + key_type: "BLS".to_string(), + keystore_file: keystore_filename.to_string(), + keystore_password_file: keystore_password_filename.to_string(), + }; + let key_config_file = + File::create(&keystore_dir.path().join("key-config.yaml")).unwrap(); + serde_yaml::to_writer(key_config_file, &key_config).unwrap(); + + let tls_keystore_file = tls_dir().join("key.p12"); + let tls_keystore_password_file = tls_dir().join("password.txt"); + + let stdio = || { + if SUPPRESS_WEB3SIGNER_LOGS { + Stdio::null() + } else { + Stdio::inherit() + } + }; + + let web3signer_child = Command::new(web3signer_binary()) + .arg(format!( + "--key-store-path={}", + keystore_dir.path().to_str().unwrap() + )) + .arg(format!("--http-listen-host={}", listen_address)) + .arg(format!("--http-listen-port={}", listen_port)) + .arg("--tls-allow-any-client=true") + .arg(format!( + "--tls-keystore-file={}", + tls_keystore_file.to_str().unwrap() + )) + .arg(format!( + "--tls-keystore-password-file={}", + tls_keystore_password_file.to_str().unwrap() + )) + .arg("eth2") + .arg(format!("--network={}", network)) + .arg("--slashing-protection-enabled=false") + .stdout(stdio()) + .stderr(stdio()) + .spawn() + .unwrap(); + + let url = Url::parse(&format!("https://{}:{}", listen_address, listen_port)).unwrap(); + + let certificate = load_pem_certificate(root_certificate_path()).unwrap(); + let http_client = Client::builder() + .add_root_certificate(certificate) + .build() + .unwrap(); + + let s = Self { + keypair, + _keystore_dir: keystore_dir, + keystore_path, + web3signer_child, + http_client, + url, + }; + + s.wait_until_up(UPCHECK_TIMEOUT).await; + + s + } + + pub async fn wait_until_up(&self, timeout: Duration) { + let start = Instant::now(); + loop { + if self.upcheck().await.is_ok() { + return; + } else if Instant::now().duration_since(start) > timeout { + panic!("upcheck failed with timeout {:?}", timeout) + } else { + sleep(Duration::from_secs(1)).await; + } + } + } + + pub async fn upcheck(&self) -> Result<(), ()> { + let url = self.url.join("upcheck").unwrap(); + self.http_client + .get(url) + .send() + .await + .map_err(|_| ())? + .error_for_status() + .map(|_| ()) + .map_err(|_| ()) + } + } + + /// A testing rig which holds a `ValidatorStore`. + struct ValidatorStoreRig { + validator_store: Arc>, + _validator_dir: TempDir, + runtime: Arc, + _runtime_shutdown: exit_future::Signal, + } + + impl ValidatorStoreRig { + pub async fn new(validator_definitions: Vec, spec: ChainSpec) -> Self { + let log = environment::null_logger().unwrap(); + let validator_dir = TempDir::new().unwrap(); + + let validator_definitions = ValidatorDefinitions::from(validator_definitions); + let initialized_validators = InitializedValidators::from_definitions( + validator_definitions, + validator_dir.path().into(), + log.clone(), + ) + .await + .unwrap(); + + let voting_pubkeys: Vec<_> = initialized_validators.iter_voting_pubkeys().collect(); + + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let (runtime_shutdown, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let executor = + TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); + + let slashing_db_path = validator_dir.path().join(SLASHING_PROTECTION_FILENAME); + let slashing_protection = SlashingDatabase::open_or_create(&slashing_db_path).unwrap(); + slashing_protection + .register_validators(voting_pubkeys.iter().copied()) + .unwrap(); + + let slot_clock = + TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1)); + + let validator_store = ValidatorStore::<_, E>::new( + initialized_validators, + slashing_protection, + Hash256::repeat_byte(42), + spec, + None, + slot_clock, + executor, + log.clone(), + ); + + Self { + validator_store: Arc::new(validator_store), + _validator_dir: validator_dir, + runtime, + _runtime_shutdown: runtime_shutdown, + } + } + + pub fn shutdown(self) { + Arc::try_unwrap(self.runtime).unwrap().shutdown_background() + } + } + + /// A testing rig which holds multiple `ValidatorStore` rigs and one `Web3Signer` rig. + /// + /// The intent of this rig is to allow testing a `ValidatorStore` using `Web3Signer` against + /// another `ValidatorStore` using a local keystore and ensure that both `ValidatorStore`s + /// behave identically. + struct TestingRig { + _signer_rig: Web3SignerRig, + validator_rigs: Vec, + validator_pubkey: PublicKeyBytes, + } + + impl Drop for TestingRig { + fn drop(&mut self) { + for rig in std::mem::take(&mut self.validator_rigs) { + rig.shutdown(); + } + } + } + + impl TestingRig { + pub async fn new(network: &str, spec: ChainSpec, listen_port: u16) -> Self { + let signer_rig = + Web3SignerRig::new(network, WEB3SIGNER_LISTEN_ADDRESS, listen_port).await; + let validator_pubkey = signer_rig.keypair.pk.clone(); + + let local_signer_validator_store = { + let validator_definition = ValidatorDefinition { + enabled: true, + voting_public_key: validator_pubkey.clone(), + graffiti: None, + description: String::default(), + signing_definition: SigningDefinition::LocalKeystore { + voting_keystore_path: signer_rig.keystore_path.clone(), + voting_keystore_password_path: None, + voting_keystore_password: Some(KEYSTORE_PASSWORD.to_string().into()), + }, + }; + ValidatorStoreRig::new(vec![validator_definition], spec.clone()).await + }; + + let remote_signer_validator_store = { + let validator_definition = ValidatorDefinition { + enabled: true, + voting_public_key: validator_pubkey.clone(), + graffiti: None, + description: String::default(), + signing_definition: SigningDefinition::Web3Signer { + url: signer_rig.url.to_string(), + root_certificate_path: Some(root_certificate_path()), + request_timeout_ms: None, + }, + }; + ValidatorStoreRig::new(vec![validator_definition], spec).await + }; + + Self { + _signer_rig: signer_rig, + validator_rigs: vec![local_signer_validator_store, remote_signer_validator_store], + validator_pubkey: PublicKeyBytes::from(&validator_pubkey), + } + } + + /// Run the `generate_sig` function across all validator stores on `self` and assert that + /// they all return the same value. + pub async fn assert_signatures_match( + self, + case_name: &str, + generate_sig: F, + ) -> Self + where + F: Fn(PublicKeyBytes, Arc>) -> R, + R: Future, + // We use the `SignedObject` trait to white-list objects for comparison. This avoids + // accidentally comparing something meaningless like a `()`. + S: SignedObject, + { + let mut prev_signature = None; + for (i, validator_rig) in self.validator_rigs.iter().enumerate() { + let signature = + generate_sig(self.validator_pubkey, validator_rig.validator_store.clone()) + .await; + + if let Some(prev_signature) = &prev_signature { + assert_eq!( + prev_signature, &signature, + "signature mismatch at index {} for case {}", + i, case_name + ); + } + + prev_signature = Some(signature) + } + assert!(prev_signature.is_some(), "sanity check"); + self + } + } + + /// Get a generic, arbitrary attestation for signing. + fn get_attestation() -> Attestation { + Attestation { + aggregation_bits: BitList::with_capacity(1).unwrap(), + data: AttestationData { + slot: <_>::default(), + index: <_>::default(), + beacon_block_root: <_>::default(), + source: Checkpoint { + epoch: <_>::default(), + root: <_>::default(), + }, + target: Checkpoint { + epoch: <_>::default(), + root: <_>::default(), + }, + }, + signature: AggregateSignature::empty(), + } + } + + /// Test all the "base" (phase 0) types. + async fn test_base_types(network: &str, listen_port: u16) { + let network_config = Eth2NetworkConfig::constant(network).unwrap().unwrap(); + let spec = &network_config.chain_spec::().unwrap(); + + TestingRig::new(network, spec.clone(), listen_port) + .await + .assert_signatures_match("randao_reveal", |pubkey, validator_store| async move { + validator_store + .randao_reveal(pubkey, Epoch::new(0)) + .await + .unwrap() + }) + .await + .assert_signatures_match("beacon_block_base", |pubkey, validator_store| async move { + let block = BeaconBlock::Base(BeaconBlockBase::empty(spec)); + let block_slot = block.slot(); + validator_store + .sign_block(pubkey, block, block_slot) + .await + .unwrap() + }) + .await + .assert_signatures_match("attestation", |pubkey, validator_store| async move { + let mut attestation = get_attestation(); + validator_store + .sign_attestation(pubkey, 0, &mut attestation, Epoch::new(0)) + .await + .unwrap(); + attestation + }) + .await + .assert_signatures_match("signed_aggregate", |pubkey, validator_store| async move { + let attestation = get_attestation(); + validator_store + .produce_signed_aggregate_and_proof( + pubkey, + 0, + attestation, + SelectionProof::from(Signature::empty()), + ) + .await + .unwrap() + }) + .await + .assert_signatures_match("selection_proof", |pubkey, validator_store| async move { + validator_store + .produce_selection_proof(pubkey, Slot::new(0)) + .await + .unwrap() + }) + .await; + } + + /// Test all the Altair types. + async fn test_altair_types(network: &str, listen_port: u16) { + let network_config = Eth2NetworkConfig::constant(network).unwrap().unwrap(); + let spec = &network_config.chain_spec::().unwrap(); + let altair_fork_slot = spec + .altair_fork_epoch + .unwrap() + .start_slot(E::slots_per_epoch()); + + TestingRig::new(network, spec.clone(), listen_port) + .await + .assert_signatures_match( + "beacon_block_altair", + |pubkey, validator_store| async move { + let mut altair_block = BeaconBlockAltair::empty(spec); + altair_block.slot = altair_fork_slot; + validator_store + .sign_block(pubkey, BeaconBlock::Altair(altair_block), altair_fork_slot) + .await + .unwrap() + }, + ) + .await + .assert_signatures_match( + "sync_selection_proof", + |pubkey, validator_store| async move { + validator_store + .produce_sync_selection_proof( + &pubkey, + altair_fork_slot, + SyncSubnetId::from(0), + ) + .await + .unwrap() + }, + ) + .await + .assert_signatures_match( + "sync_committee_signature", + |pubkey, validator_store| async move { + validator_store + .produce_sync_committee_signature( + altair_fork_slot, + Hash256::zero(), + 0, + &pubkey, + ) + .await + .unwrap() + }, + ) + .await + .assert_signatures_match( + "signed_contribution_and_proof", + |pubkey, validator_store| async move { + let contribution = SyncCommitteeContribution { + slot: altair_fork_slot, + beacon_block_root: <_>::default(), + subcommittee_index: <_>::default(), + aggregation_bits: <_>::default(), + signature: AggregateSignature::empty(), + }; + validator_store + .produce_signed_contribution_and_proof( + 0, + pubkey, + contribution, + SyncSelectionProof::from(Signature::empty()), + ) + .await + .unwrap() + }, + ) + .await; + } + + #[tokio::test] + async fn mainnet_base_types() { + test_base_types("mainnet", 4242).await + } + + /* The Altair fork for mainnet has not been announced, so this test will always fail. + * + * If this test starts failing, it's likely that the fork has been decided and we should remove + * the `#[should_panic]` + */ + #[tokio::test] + #[should_panic] + async fn mainnet_altair_types() { + test_altair_types("mainnet", 4243).await + } + + #[tokio::test] + async fn pyrmont_base_types() { + test_base_types("pyrmont", 4244).await + } + + #[tokio::test] + async fn pyrmont_altair_types() { + test_altair_types("pyrmont", 4245).await + } + + #[tokio::test] + async fn prater_base_types() { + test_base_types("prater", 4246).await + } + + #[tokio::test] + async fn prater_altair_types() { + test_altair_types("prater", 4247).await + } +} diff --git a/testing/web3signer_tests/tls/README.md b/testing/web3signer_tests/tls/README.md new file mode 100644 index 000000000..53ff97c27 --- /dev/null +++ b/testing/web3signer_tests/tls/README.md @@ -0,0 +1,6 @@ +## TLS Testing Files + +The files in this directory are used for testing TLS with web3signer. We store them in this +repository since the are not sensitive and it's simpler than regenerating them for each test. + +The files were generated using the `./generate.sh` script. diff --git a/testing/web3signer_tests/tls/cert.pem b/testing/web3signer_tests/tls/cert.pem new file mode 100644 index 000000000..7f2d5f1f2 --- /dev/null +++ b/testing/web3signer_tests/tls/cert.pem @@ -0,0 +1,32 @@ +-----BEGIN CERTIFICATE----- +MIIFmTCCA4GgAwIBAgIUd6yn4o1bKr2YpzTxcBmoiM4PorkwDQYJKoZIhvcNAQEL +BQAwajELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlZBMREwDwYDVQQHDAhTb21lQ2l0 +eTESMBAGA1UECgwJTXlDb21wYW55MRMwEQYDVQQLDApNeURpdmlzaW9uMRIwEAYD +VQQDDAkxMjcuMC4wLjEwIBcNMjEwOTA2MDgxMDU2WhgPMjEyMTA4MTMwODEwNTZa +MGoxCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJWQTERMA8GA1UEBwwIU29tZUNpdHkx +EjAQBgNVBAoMCU15Q29tcGFueTETMBEGA1UECwwKTXlEaXZpc2lvbjESMBAGA1UE +AwwJMTI3LjAuMC4xMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAx/a1 +SRqehj/D18166GcJh/zOyDtZCbeoLWcVfS1aBq+J1FFy4LYKWgwNhOYsrxHLhsIr +/LpHpRm/FFqLPxGNoEPMcJi1dLcELPcJAG1l+B0Ur52V/nxOmzn71Mi0WQv0oOFx +hOtUOToY3heVW0JXgrILhdD834mWdsxBWPhq1LeLZcMth4woMgD9AH4KzxUNtFvo +8i8IneEYvoDIQ8dGZ5lHnFV5kaC8Is0hevMljTw83E9BD0B/bpp+o2rByccVulsy +/WK763tFteDxK5eZZ3/5rRId+uoN5+D4oRnG6zuki0t7+eTZo1cUPi28IIDTNjPR +Xvw35dt+SdTDjtI/FUf8VWhLIHZZXaevFliuBbcuOMpWCdjAdwb7Uf9WpMnxzZtK +fatAC9dk3VPsehFcf6w/H+ah3tu/szAaDJ5zZb0m05cAxDZekZ9SccBIPglccM3f +vzNjrDIoi4z7uCiTJc2FW0qb2MzusQsGjtLW53n7IGoSIFDvOhiZa9D+vOE2wG6o +VNf2K9/QvwNDCzRvW81mcUCRr/BhcAmX5drwYPwUEcdBXQeFPt6nZ33fmIgl2Cbv +io9kUJzjlQWOZ6BX5FmC69dWAedcfHGY693tG6LQKk9a5B+NiuIB4m1bHcvjYhsh +GqVrw980YIN52RmIoskGRdt34/gKHWcqjIEK0+kCAwEAAaM1MDMwCwYDVR0PBAQD +AgQwMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEQQIMAaHBH8AAAEwDQYJKoZI +hvcNAQELBQADggIBAILVu5ppYnumyxvchgSLAi/ahBZV/wmtI3X8vxOHuQwYF8rZ +7b2gd+PClJBuhxeOEJZTtCSDMMUdlBXsxnoftp0TcDhFXeAlSp0JQe38qGAlX94l +4ZH39g+Ut5kVpImb/nI/iQhdOSDzQHaivTMjhNlBW+0EqvVJ1YsjjovtcxXh8gbv +4lKpGkuT6xVRrSGsZh0LQiVtngKNqte8vBvFWBQfj9JFyoYmpSvYl/LaYjYkmCya +V2FbfrhDXDI0IereknqMKDs8rF4Ik6i22b+uG91yyJsRFh63x7agEngpoxYKYV6V +5YXIzH5kLX8hklHnLgVhES2ZjhheDgC8pCRUCPqR4+KVnQcFRHP9MJCqcEIFAppD +oHITdiFDs/qE0EDV9WW1iOWgBmdgxUZ8dh1CfW+7B72+Uy0/eXWdnlrRDe5cN/hs +xXpnLCMfzSDEMA4WmImabpU/fRXL7pazZENJj7iyIAr/pEL34+QjqVfWaXkWrHoN +KsrkxTdoZNVdarBDSw9JtMUECmnWYOjMaOm1O8waib9H1SlPSSPrK5pGT/6h1g0d +LM982X36Ej8XyW33E5l6qWiLVRye7SaAvZbVLsyd+cfemi6BPsK+y09eCs4a+Qp7 +9YWZOPT6s/ahJYdTGF961JZ62ypIioimW6wx8hAMCkKKfhn1WI0+0RlOrjbw +-----END CERTIFICATE----- diff --git a/testing/web3signer_tests/tls/config b/testing/web3signer_tests/tls/config new file mode 100644 index 000000000..d19a89b02 --- /dev/null +++ b/testing/web3signer_tests/tls/config @@ -0,0 +1,19 @@ +[req] +default_bits = 4096 +default_md = sha256 +distinguished_name = req_distinguished_name +x509_extensions = v3_req +prompt = no +[req_distinguished_name] +C = US +ST = VA +L = SomeCity +O = MyCompany +OU = MyDivision +CN = 127.0.0.1 +[v3_req] +keyUsage = keyEncipherment, dataEncipherment +extendedKeyUsage = serverAuth +subjectAltName = @alt_names +[alt_names] +IP.1 = 127.0.0.1 diff --git a/testing/web3signer_tests/tls/generate.sh b/testing/web3signer_tests/tls/generate.sh new file mode 100755 index 000000000..1e45bb61b --- /dev/null +++ b/testing/web3signer_tests/tls/generate.sh @@ -0,0 +1,4 @@ +#!/bin/bash +openssl req -x509 -sha256 -nodes -days 36500 -newkey rsa:4096 -keyout key.key -out cert.pem -config config && +openssl pkcs12 -export -out key.p12 -inkey key.key -in cert.pem -password pass:$(cat password.txt) + diff --git a/testing/web3signer_tests/tls/key.key b/testing/web3signer_tests/tls/key.key new file mode 100644 index 000000000..6f1331db1 --- /dev/null +++ b/testing/web3signer_tests/tls/key.key @@ -0,0 +1,52 @@ +-----BEGIN PRIVATE KEY----- +MIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQDH9rVJGp6GP8PX +zXroZwmH/M7IO1kJt6gtZxV9LVoGr4nUUXLgtgpaDA2E5iyvEcuGwiv8ukelGb8U +Wos/EY2gQ8xwmLV0twQs9wkAbWX4HRSvnZX+fE6bOfvUyLRZC/Sg4XGE61Q5Ohje +F5VbQleCsguF0PzfiZZ2zEFY+GrUt4tlwy2HjCgyAP0AfgrPFQ20W+jyLwid4Ri+ +gMhDx0ZnmUecVXmRoLwizSF68yWNPDzcT0EPQH9umn6jasHJxxW6WzL9Yrvre0W1 +4PErl5lnf/mtEh366g3n4PihGcbrO6SLS3v55NmjVxQ+LbwggNM2M9Fe/Dfl235J +1MOO0j8VR/xVaEsgdlldp68WWK4Fty44ylYJ2MB3BvtR/1akyfHNm0p9q0AL12Td +U+x6EVx/rD8f5qHe27+zMBoMnnNlvSbTlwDENl6Rn1JxwEg+CVxwzd+/M2OsMiiL +jPu4KJMlzYVbSpvYzO6xCwaO0tbnefsgahIgUO86GJlr0P684TbAbqhU1/Yr39C/ +A0MLNG9bzWZxQJGv8GFwCZfl2vBg/BQRx0FdB4U+3qdnfd+YiCXYJu+Kj2RQnOOV +BY5noFfkWYLr11YB51x8cZjr3e0botAqT1rkH42K4gHibVsdy+NiGyEapWvD3zRg +g3nZGYiiyQZF23fj+AodZyqMgQrT6QIDAQABAoICAGMICuZGmaXxJIPXDvzUMsM3 +cA14XvNSEqdRuzHAaSqQexk8sUEaxuurtnJQMGcP0BVQSsqiUuMwahKheP7mKZbq +nPBSoONJ1HaUbc/ZXjvP4zPKPsPHOoLj55WNRMwpAKFApaDnj1G8NR6g3WZR59ch +aFWAmAv5LxxsshxnAzmQIShnzj+oKSwCk0pQIfhG+/+L2UVAB+tw1HlcfFIc+gBK +yE1jg46c5S/zGZaznrBg2d9eHOF51uKm/vrd31WYFGmzyv/0iw7ngTG/UpF9Rgsd +NUECjPh8PCDPqTLX+kz7v9UAsEiljye2856LtfT++BuK9DEvhlt/Jf9YsPUlqPl3 +3wUG8yiqBQrlGTUY1KUdHsulmbTiq4Q9ch5QLcvazk+9c7hlB6WP+/ofqgIPSlDt +fOHkROmO7GURz78lVM8+E/pRgy6qDq+yM1uVMeWWme4hKfOAL2lnJDTO4PKNQA4b +03YXsdVSz4mm9ppnyHIPXei6/qHpU/cRRf261HNEI16eC0ZnoIAxhORJtxo6kMns +am4yuhHm9qLjbOI1uJPAgpR/o0O5NaBgkdEzJ102pmv2grf2U743n9bqu+y/vJF9 +HRmMDdJgZSmcYxQuLe0INzLDnTzOdmjbqjB6lDsSwtrEo/KLtXIStrFMKSHIE/QV +96u8nWPomN83HqkVvQmBAoIBAQDrs8eKAQ3meWtmsSqlzCNVAsJA1xV4DtNaWBTz +MJXwRWywem/sHCoPsJ7c5UTUjQDOfNEUu8iW/m60dt0U+81/O9TLBP1Td6jxLg8X +92atLs8wHQDUqrgouce0lyS7to+R3K+N8YtWL2y9w9jbf/XT9iTL5TXGc8RFrmMg +nDQ1EShojU0U0I1lKpDJTx2R1FANfyd3iHSsENRwYj5MF8iQSag79Ek06BKLWHHt +OJj2oiO3VIAKQYVA9aKxfiiOWXWumPHq7r6UoNJK3UNzfBvguhEzl8k6VjZBCR9q +WwvSTba4mOgHMIXdV/9Wr3y8Cus2lX5YGOK4OUx/ZaCdaBtZAoIBAQDZLwwZDHen +Iw1412m/D/6HBS38bX78t+0hL7LNqgVpiZdNbLq57SGRbUnZZ/jlmtyLw3be6BV3 +IcLyflYW+4Wi8AAqVADlXjMC+GIuDNCCicwWxJeIFaAGM7Jt6Fa08H/loIAMM7NC +y1CmQnCR9OnHRdcBaU1y4ForP4f8B/hwh3hSQEFPKgF/MQwDnR7UzPgRrUOTovN/ +4D7j1Wx6FpYX9hGZL0i2K1ygRZE03t6VV7xhCkne96VvDEj1Zo/S4HFaEmDD+EjR +pvXVhPRed7GZ6AMs2JxOPhRiu3G+AQL1HPMDlA8QiPtTh0Zf99j/5NXKBEyH/fp1 +V04L1s7wf7sRAoIBAQCb3/ftJ0dXDSNe9Xl7ziXrmXh3wwYasMtLawbn0VDHZlI7 +36zW28VhPO/CrAi5/En1RIxNBubgHIF/7T/GGcRMCXhvjuwtX+wlG821jtKjY1p3 +uiaLfh9uJ3aP0ojjbxdBYk3jNENuisyCLtviRZyAQb8R7JKEnJjHcE10CnloQuGT +SycXxdhMeDrqNt0aTOtoEZg7L83g4PxtGjuSvQPRkDSm+aXUTEm/R42IUS6vpIi0 +PDi1D6GdVRT0BrexdC4kelc6hAsbZcPM6MkrvX7+Pm8TzKSyZMNafTr+bhnCScy2 +BcEkyA0vVXuyizmVbi8hmPnGLyb4qEQT2FTA5FF5AoIBAQCEj0vCCjMKB8IUTN7V +aGzBeq7b0PVeSODqjZOEJk9RYFLCRigejZccjWky0lw/wGr2v6JRYbSgVzIHEod3 +VaP2lKh1LXqyhPF70aETXGz0EClKiEm5HQHkZy90GAi8PcLCpFkjmXbDwRcDs6/D +1onOQFmAGgbUpA1FMmzMrwy7mmQdR+zU5d2uBYDAv+jumACdwXRqq14WYgfgxgaE +6j5Id7+8EPk/f230wSFk9NdErh1j2YTHG76U7hml9yi33JgzEt6PHn9Lv61y2sjQ +1BvJxawSdk/JDekhbil5gGKOu1G0kG01eXZ1QC77Kmr/nWvD9yXDJ4j0kAop/b2n +Wz8RAoIBAQDn1ZZGOJuVRUoql2A65zwtu34IrYD+2zQQCBf2hGHtwXT6ovqRFqPV +vcQ7KJP+zVT4GimFlZy7lUx8H4j7+/Bxn+PpUHHoDYjVURr12wk2w8pxwcKnbiIw +qaMkF5KG2IUVb7F8STEuKv4KKeuRlB4K2HC2J8GZOLXO21iOqNMhMRO11wp9jkKI +n83wtLH34lLRz4VzIW3rfvPeVoP1zoDkLvD8k/Oyjrf4Bishg9vCHyhQkB1JDtMU +1bfH8mxwKozakpJa23a8lE5NLoc9NOZrKM4+cefY1MZ3FjlaZfkS5jlhY4Qhx+fl ++9j5xRPaH+mkJHaJIqzQad+b1A2eIa+L +-----END PRIVATE KEY----- diff --git a/testing/web3signer_tests/tls/key.p12 b/testing/web3signer_tests/tls/key.p12 new file mode 100644 index 0000000000000000000000000000000000000000..2f19e57f026ee5a944632d1ffdfbfc8e92aceb1a GIT binary patch literal 4197 zcmV-r5Ss5Wf)HT>0Ru3C5GMu+Duzgg_YDCD0ic2q7zBb46flAi5HNxUKL!aZhDe6@ z4FLxRpn?W4FoFgw0s#Opf(9i92`Yw2hW8Bt2LUh~1_~;MNQUItqH+EowLDnPJJRdvl?^`SaGiDqiy_nhG4)O*G zww>H&yM))zQHwu#LfAQ5)E~($GmZSI83zVUiXRH{;BM6**_$s?X!gpjcydFd1w7UD zh51mhxh8BWq@GFnExZ5k@PgK7m+LJ0K1`^ie|SfJX^FPh9$xN7WC|)h*PiIKBnTDA z1t!H`AQb?LKmwLxH-Skr>wx{+on@Tiw6(vP_qJX)I@?C;1_9=6imdp1oj~|!<;cD` zYn_K*9-e_dcG2?J`*q`k8qF~;CE*8aQ0VuWoS$>ya7kHd*IT;7-W;F$3SPh#mLOQG zSfp*7BG+A$46~{n4PxGZTv##pm-8xz=#Q3`B7Z=Zd~5G#%^X3BE}OFdD$A6{`zxZJ zg8Uhe{Qa7hDLo(}vsQ>(=B4-pnfg92+Dq##@+xg^)hu)(*3aK6%}`C`_X2Di;--lV zaY(fGu$`-_?PlCn9l<{)K?X_odW<8RC~pWGJ-7}{p93hBekjNmw$Mcp;GdQK5yysc zMli3mxmoa330NG5esYm+Pmba9^>8uMXjmVvHKuuk&ozPkg;}4l&CoozVcA_drin`q2K(D^NorG zqIsTkYXsN0z7cbH8TyihM@ks9R0AT%cvbZ>m!cKNhnwb#qtL?cN|(nx*d8M+#dFVo zQ0HvD{l2v7?nxlfV{t!qku~342nV9<_N#hv)FCuPiI!^PLJ z*?MLZw%(tBoT*7ovQ#_6RQlP>Tu@BB?O*Nxuf`wrFsw}ofFDq{W=JQl6*@*Gzeg# z6nMyNUMSz<)clZy2!=OD5p%H~(k)Jf%T&s;79}F2AFQ8Z0;S2cG-2@1&8~X%Uz=AG zHQr7?klLHgCYM&C8pa~l9*NfZ8);yXci-yM6N0LIH}HCEE}R^p__WJtt=eIV~EcQReE|3hzFx2w{ZAXfFqC* zef8-q0vD(S(bm0ecz+Dq4kF`K4Zk#+<3?+uUJ*=!(PWdG){GO;g(e`>T}~O4FED`b zsMu-0Z#@g?1lx(j_kofyj@Ckq@=q$+;Q*Qntb6E20Ahcq2IdsaPB&$LdG9nBbZPZS zZw9^eR*U5dIe+?6=~GYWvH~ePEWjIU-L_{Dm@x)uU!yS~mJwVNUloEp#sk9zaR~2P zQ%;c42i^JaY^QnXjl@kPk2XcW8{mzXZ}gi9ZHy@XwX&zPjw=9>fero!q6_f*nEyCX z0*wFInbInYz0@1$*r^fqVm?G`2{jev4jJ>J@?5r+nxE}eJgHgn%{L30HzGF!!!TF- zEiuksgt%8B%dr-?eL2*S3C}w4o9O5$qT2B30Pi!Kil(5#ptquqFo&LNQUp zu?O8JQVaqD2ml0v2~ev;g{28v66kt1KU=0bPjgH-jvp(!*7W=54~hm{o8e7~pV&bd z+}rxDW_cIro;liiD_FW%%gnRSQ9%0a8(%sEHIXjeL-$Gst>&V{hDFFmi*%g1O&q;> z;bjkk{~Bde#FQjo6kYeo^AJwNBR!+?TT|Qn==QBa2DU@pSgbZL9RnStP8&lh=@t= zzRR`k7{tY3Kp@=izz9;h$Y;Q5!dW|tb;huk;?v6gYiT#|g|WpcV#KW#UI*(V`ANK` zR@Mlk*vt(roJ1?}%k%?KuV3kk~U( zNJ&0WW^ciKGV^|3D@A*o$?xuz4d8Mlh4aK@`WC4kl4YkVwOC#MYl3 z%b&A2T=bR2LJg+Rpk<{+rX4L-kM-Q>X20h#%Cp+_23{rZ`EUS5e1yJN*2@&zQut>?0J7nqQk8}k&cSzz1n5zsCPR^T;_Wo}nhVifod7L8<9PdE6-Wzu?pE=8hT#go=$B>O`GQfioWaSC%MUkvKtQ&FJutrf5) z?+ZdBa=-+}Nla3wKXk@&MXW$=jBh82rMlY9eKRxjQHh;>gZi_On`m#4g0A$KB0!C; zV?`2G_me7Educ8t4l-Kcs!F1wDvOD55Rd<_RP*#%#utZaBvHO4Hl+Zi9X>6*3#Yb> zOg@ycTA|oY0bSt-qa~Kjf?Loqpl}H@ao?L<{qd>6j;Q#D1Dm+LLpVUg?p8f89szH& z6wSlVpI|j?2*O{vfp^h5XYI}npmQ;l!x=Xr2FI=Oaw|F1sYsk5g;#Fq3f3Ic^`??x zIA8ys$i8*9@A^@-Z@1RqkT-`~UtMdpYOLyPM(D5PGUCZd=!nPxlwz5e9#kz9wvR@k zKM3LX>TKd-XJm(W6RR>!oEct*cR>0);Hk6H#|-07j_E% zJhs6FVSt-z$r+!}(8MubUpDw(>QkT_{(iIi`@YY;PoLm>$jrx8gfzZ{Vs@inrV81X zw*fvawlOrHKeQ-qU`PIep>me-5oINt^xuf?1pKO%WrUY}YgYX`ecw2GWvCguJ}%Dt51O=8)&w>rB)a!`5)GP_>H$5f1^LbM z9oZXuuYWeE`_sQakav}PKEN)@i|KF8a-&6$ClYV2$@kxGeSRv;d+f0#GLyeeM68+} zItScG`?{_>RNbZX7Up38&D&xt(SsG@g;n77hM|G~9Q&MjF@U_cEo zehPY2NBoSyH__@%9$6B~ApM$#Vf`ygm)%f_d;|5}?&Tqtr~$rtfZAH#jUuOEu3xfv zIbrjQR1_qW-zj84?k<{BBR6BEqW}XB%j`p;<1-o&wQt=5UwbIQBmYG}Y?pl^8fx)L zU(`BS5KnGXcE^8fxer0hYZYQ*2Wh;ua|HPzkWM}#sP;8QFOA^MC1*nw72T`pynpt+#V1xH%Wt za89yhLZV`PnZue46p?Z3L8{@Un1CnH&> z?9$n@|Gou}Y`IP;RN}U2)}A}^2t5u=w^W4AEBY7MDf+@x@VqbQDiq|Aiqj&g_X8d_ zFZO<3)dMc-W^Urd&hxpK4S0ryyd$w0Fm8m>!UmZ<@N9~dNX!2l1jfpzzpBc~_Xw4G z28MC|1iz(1GWb3dt8E_RWVW=iQa) z=pGM}^c@ptMCMNwgF$>e3q z=AiO;4ca|`gHvI0BloknjWJ%S;Y(k=ThR+huG7hWxFd@?X^LW0Lc%&;kQ=d8cO0~? zNVjEsVLCA-Fe3&DDuzgg_YDCF6)_eB6w_md)w*9lB6xjm-)4CsBsaKjyD%{@AutIB v1uG5%0vZJX1Qf&8yZ(lZvg>dBvmlr6`eL>??I#2X5r^iMF-ks`0s;sC%p3y` literal 0 HcmV?d00001 diff --git a/testing/web3signer_tests/tls/password.txt b/testing/web3signer_tests/tls/password.txt new file mode 100644 index 000000000..375d5c3ce --- /dev/null +++ b/testing/web3signer_tests/tls/password.txt @@ -0,0 +1 @@ +meow diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 7bc049824..549b86c85 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -68,3 +68,5 @@ itertools = "0.10.0" monitoring_api = { path = "../common/monitoring_api" } sensitive_url = { path = "../common/sensitive_url" } task_executor = { path = "../common/task_executor" } +reqwest = { version = "0.11.0", features = ["json","stream"] } +url = "2.2.2" diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 841a12574..95500fc94 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -5,6 +5,7 @@ use crate::{ validator_store::ValidatorStore, }; use environment::RuntimeContext; +use futures::future::join_all; use slog::{crit, error, info, trace}; use slot_clock::SlotClock; use std::collections::HashMap; @@ -288,7 +289,7 @@ impl AttestationService { // Then download, sign and publish a `SignedAggregateAndProof` for each // validator that is elected to aggregate for this `slot` and // `committee_index`. - self.produce_and_publish_aggregates(attestation_data, &validator_duties) + self.produce_and_publish_aggregates(&attestation_data, &validator_duties) .await .map_err(move |e| { crit!( @@ -350,10 +351,11 @@ impl AttestationService { .await .map_err(|e| e.to_string())?; - let mut attestations = Vec::with_capacity(validator_duties.len()); - - for duty_and_proof in validator_duties { + // Create futures to produce signed `Attestation` objects. + let attestation_data_ref = &attestation_data; + let signing_futures = validator_duties.iter().map(|duty_and_proof| async move { let duty = &duty_and_proof.duty; + let attestation_data = attestation_data_ref; // Ensure that the attestation matches the duties. #[allow(clippy::suspicious_operation_groupings)] @@ -368,7 +370,7 @@ impl AttestationService { "duty_index" => duty.committee_index, "attestation_index" => attestation_data.index, ); - continue; + return None; } let mut attestation = Attestation { @@ -377,26 +379,38 @@ impl AttestationService { signature: AggregateSignature::infinity(), }; - if let Err(e) = self.validator_store.sign_attestation( - duty.pubkey, - duty.validator_committee_index as usize, - &mut attestation, - current_epoch, - ) { - crit!( - log, - "Failed to sign attestation"; - "error" => ?e, - "committee_index" => committee_index, - "slot" => slot.as_u64(), - ); - continue; - } else { - attestations.push(attestation); + match self + .validator_store + .sign_attestation( + duty.pubkey, + duty.validator_committee_index as usize, + &mut attestation, + current_epoch, + ) + .await + { + Ok(()) => Some(attestation), + Err(e) => { + crit!( + log, + "Failed to sign attestation"; + "error" => ?e, + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ); + None + } } - } + }); - let attestations_slice = attestations.as_slice(); + // Execute all the futures in parallel, collecting any successful results. + let attestations = &join_all(signing_futures) + .await + .into_iter() + .flatten() + .collect::>>(); + + // Post the attestations to the BN. match self .beacon_nodes .first_success(RequireSynced::No, |beacon_node| async move { @@ -405,7 +419,7 @@ impl AttestationService { &[metrics::ATTESTATIONS_HTTP_POST], ); beacon_node - .post_beacon_pool_attestations(attestations_slice) + .post_beacon_pool_attestations(attestations) .await }) .await @@ -447,13 +461,12 @@ impl AttestationService { /// returned to the BN. async fn produce_and_publish_aggregates( &self, - attestation_data: AttestationData, + attestation_data: &AttestationData, validator_duties: &[DutyAndProof], ) -> Result<(), String> { let log = self.context.log(); - let attestation_data_ref = &attestation_data; - let aggregated_attestation = self + let aggregated_attestation = &self .beacon_nodes .first_success(RequireSynced::No, |beacon_node| async move { let _timer = metrics::start_timer_vec( @@ -462,55 +475,59 @@ impl AttestationService { ); beacon_node .get_validator_aggregate_attestation( - attestation_data_ref.slot, - attestation_data_ref.tree_hash_root(), + attestation_data.slot, + attestation_data.tree_hash_root(), ) .await .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))? - .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data_ref)) + .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) .map(|result| result.data) }) .await .map_err(|e| e.to_string())?; - let mut signed_aggregate_and_proofs = Vec::new(); - - for duty_and_proof in validator_duties { + // Create futures to produce the signed aggregated attestations. + let signing_futures = validator_duties.iter().map(|duty_and_proof| async move { let duty = &duty_and_proof.duty; - - let selection_proof = if let Some(proof) = duty_and_proof.selection_proof.as_ref() { - proof - } else { - // Do not produce a signed aggregate for validators that are not - // subscribed aggregators. - continue; - }; + let selection_proof = duty_and_proof.selection_proof.as_ref()?; let slot = attestation_data.slot; let committee_index = attestation_data.index; if duty.slot != slot || duty.committee_index != committee_index { crit!(log, "Inconsistent validator duties during signing"); - continue; + return None; } - match self.validator_store.produce_signed_aggregate_and_proof( - duty.pubkey, - duty.validator_index, - aggregated_attestation.clone(), - selection_proof.clone(), - ) { - Ok(aggregate) => signed_aggregate_and_proofs.push(aggregate), + match self + .validator_store + .produce_signed_aggregate_and_proof( + duty.pubkey, + duty.validator_index, + aggregated_attestation.clone(), + selection_proof.clone(), + ) + .await + { + Ok(aggregate) => Some(aggregate), Err(e) => { crit!( log, "Failed to sign attestation"; - "error" => ?e + "error" => ?e, + "pubkey" => ?duty.pubkey, ); - continue; + None } } - } + }); + + // Execute all the futures in parallel, collecting any successful results. + let signed_aggregate_and_proofs = join_all(signing_futures) + .await + .into_iter() + .flatten() + .collect::>(); if !signed_aggregate_and_proofs.is_empty() { let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice(); diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index f102df18b..289f8a09f 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -240,6 +240,7 @@ impl BlockService { let randao_reveal = self .validator_store .randao_reveal(validator_pubkey, slot.epoch(E::slots_per_epoch())) + .await .map_err(|e| format!("Unable to produce randao reveal signature: {:?}", e))? .into(); @@ -276,6 +277,7 @@ impl BlockService { let signed_block = self_ref .validator_store .sign_block(*validator_pubkey_ref, block, current_slot) + .await .map_err(|e| format!("Unable to sign block: {:?}", e))?; let _post_timer = metrics::start_timer_vec( diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 6e5e5a546..6428034d8 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -16,6 +16,7 @@ use crate::{ }; use environment::RuntimeContext; use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; +use futures::future::join_all; use parking_lot::RwLock; use safe_arith::ArithError; use slog::{debug, error, info, warn, Logger}; @@ -64,13 +65,14 @@ pub struct DutyAndProof { impl DutyAndProof { /// Instantiate `Self`, computing the selection proof as well. - pub fn new( + pub async fn new( duty: AttesterData, validator_store: &ValidatorStore, spec: &ChainSpec, ) -> Result { let selection_proof = validator_store .produce_selection_proof(duty.pubkey, duty.slot) + .await .map_err(Error::FailedToProduceSelectionProof)?; let selection_proof = selection_proof @@ -637,56 +639,77 @@ async fn poll_beacon_attesters_for_epoch( let dependent_root = response.dependent_root; - let relevant_duties = response - .data - .into_iter() - .filter(|attester_duty| local_pubkeys.contains(&attester_duty.pubkey)) - .collect::>(); + // Filter any duties that are not relevant or already known. + let new_duties = { + // Avoid holding the read-lock for any longer than required. + let attesters = duties_service.attesters.read(); + response + .data + .into_iter() + .filter(|duty| local_pubkeys.contains(&duty.pubkey)) + .filter(|duty| { + // Only update the duties if either is true: + // + // - There were no known duties for this epoch. + // - The dependent root has changed, signalling a re-org. + attesters.get(&duty.pubkey).map_or(true, |duties| { + duties + .get(&epoch) + .map_or(true, |(prior, _)| *prior != dependent_root) + }) + }) + .collect::>() + }; debug!( log, "Downloaded attester duties"; "dependent_root" => %dependent_root, - "num_relevant_duties" => relevant_duties.len(), + "num_new_duties" => new_duties.len(), ); + // Produce the `DutyAndProof` messages in parallel. + let duty_and_proof_results = join_all(new_duties.into_iter().map(|duty| { + DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec) + })) + .await; + + // Update the duties service with the new `DutyAndProof` messages. + let mut attesters = duties_service.attesters.write(); let mut already_warned = Some(()); - let mut attesters_map = duties_service.attesters.write(); - for duty in relevant_duties { - let attesters_map = attesters_map.entry(duty.pubkey).or_default(); + for result in duty_and_proof_results { + let duty_and_proof = match result { + Ok(duty_and_proof) => duty_and_proof, + Err(e) => { + error!( + log, + "Failed to produce duty and proof"; + "error" => ?e, + "msg" => "may impair attestation duties" + ); + // Do not abort the entire batch for a single failure. + continue; + } + }; - // Only update the duties if either is true: - // - // - There were no known duties for this epoch. - // - The dependent root has changed, signalling a re-org. - if attesters_map - .get(&epoch) - .map_or(true, |(prior, _)| *prior != dependent_root) + let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); + + if let Some((prior_dependent_root, _)) = + attester_map.insert(epoch, (dependent_root, duty_and_proof)) { - let duty_and_proof = - DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec)?; - - if let Some((prior_dependent_root, _)) = - attesters_map.insert(epoch, (dependent_root, duty_and_proof)) - { - // Using `already_warned` avoids excessive logs. - if dependent_root != prior_dependent_root && already_warned.take().is_some() { - warn!( - log, - "Attester duties re-org"; - "prior_dependent_root" => %prior_dependent_root, - "dependent_root" => %dependent_root, - "msg" => "this may happen from time to time" - ) - } + // Using `already_warned` avoids excessive logs. + if dependent_root != prior_dependent_root && already_warned.take().is_some() { + warn!( + log, + "Attester duties re-org"; + "prior_dependent_root" => %prior_dependent_root, + "dependent_root" => %dependent_root, + "msg" => "this may happen from time to time" + ) } } } - // Drop the write-lock. - // - // This is strictly unnecessary since the function ends immediately afterwards, but we remain - // defensive regardless. - drop(attesters_map); + drop(attesters); Ok(()) } diff --git a/validator_client/src/duties_service/sync.rs b/validator_client/src/duties_service/sync.rs index 3608c17cb..f61c600e9 100644 --- a/validator_client/src/duties_service/sync.rs +++ b/validator_client/src/duties_service/sync.rs @@ -2,6 +2,7 @@ use crate::{ doppelganger_service::DoppelgangerStatus, duties_service::{DutiesService, Error}, }; +use futures::future::join_all; use itertools::Itertools; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use slog::{crit, debug, info, warn}; @@ -330,8 +331,8 @@ pub async fn poll_sync_committee_duties( if !new_pre_compute_duties.is_empty() { let sub_duties_service = duties_service.clone(); - duties_service.context.executor.spawn_blocking( - move || { + duties_service.context.executor.spawn( + async move { fill_in_aggregation_proofs( sub_duties_service, &new_pre_compute_duties, @@ -339,6 +340,7 @@ pub async fn poll_sync_committee_duties( current_epoch, current_pre_compute_epoch, ) + .await }, "duties_service_sync_selection_proofs", ); @@ -370,8 +372,8 @@ pub async fn poll_sync_committee_duties( if !new_pre_compute_duties.is_empty() { let sub_duties_service = duties_service.clone(); - duties_service.context.executor.spawn_blocking( - move || { + duties_service.context.executor.spawn( + async move { fill_in_aggregation_proofs( sub_duties_service, &new_pre_compute_duties, @@ -379,6 +381,7 @@ pub async fn poll_sync_committee_duties( current_epoch, pre_compute_epoch, ) + .await }, "duties_service_sync_selection_proofs", ); @@ -468,7 +471,7 @@ pub async fn poll_sync_committee_duties_for_period( +pub async fn fill_in_aggregation_proofs( duties_service: Arc>, pre_compute_duties: &[(Epoch, SyncDuty)], sync_committee_period: u64, @@ -487,60 +490,54 @@ pub fn fill_in_aggregation_proofs( // Generate selection proofs for each validator at each slot, one epoch at a time. for epoch in (current_epoch.as_u64()..=pre_compute_epoch.as_u64()).map(Epoch::new) { - // Generate proofs. - let validator_proofs: Vec<(u64, Vec<_>)> = pre_compute_duties - .iter() - .filter_map(|(validator_start_epoch, duty)| { - // Proofs are already known at this epoch for this validator. - if epoch < *validator_start_epoch { - return None; + let mut validator_proofs = vec![]; + for (validator_start_epoch, duty) in pre_compute_duties { + // Proofs are already known at this epoch for this validator. + if epoch < *validator_start_epoch { + continue; + } + + let subnet_ids = match duty.subnet_ids::() { + Ok(subnet_ids) => subnet_ids, + Err(e) => { + crit!( + log, + "Arithmetic error computing subnet IDs"; + "error" => ?e, + ); + continue; } + }; - let subnet_ids = duty - .subnet_ids::() - .map_err(|e| { - crit!( - log, - "Arithmetic error computing subnet IDs"; - "error" => ?e, - ); - }) - .ok()?; + // Create futures to produce proofs. + let duties_service_ref = &duties_service; + let futures = epoch + .slot_iter(E::slots_per_epoch()) + .cartesian_product(&subnet_ids) + .map(|(duty_slot, subnet_id)| async move { + // Construct proof for prior slot. + let slot = duty_slot - 1; - let proofs = epoch - .slot_iter(E::slots_per_epoch()) - .cartesian_product(&subnet_ids) - .filter_map(|(duty_slot, &subnet_id)| { - // Construct proof for prior slot. - let slot = duty_slot - 1; + let proof = match duties_service_ref + .validator_store + .produce_sync_selection_proof(&duty.pubkey, slot, *subnet_id) + .await + { + Ok(proof) => proof, + Err(e) => { + warn!( + log, + "Unable to sign selection proof"; + "error" => ?e, + "pubkey" => ?duty.pubkey, + "slot" => slot, + ); + return None; + } + }; - let proof = duties_service - .validator_store - .produce_sync_selection_proof(&duty.pubkey, slot, subnet_id) - .map_err(|_| { - warn!( - log, - "Pubkey missing when signing selection proof"; - "pubkey" => ?duty.pubkey, - "slot" => slot, - ); - }) - .ok()?; - - let is_aggregator = proof - .is_aggregator::() - .map_err(|e| { - warn!( - log, - "Error determining is_aggregator"; - "pubkey" => ?duty.pubkey, - "slot" => slot, - "error" => ?e, - ); - }) - .ok()?; - - if is_aggregator { + match proof.is_aggregator::() { + Ok(true) => { debug!( log, "Validator is sync aggregator"; @@ -548,16 +545,31 @@ pub fn fill_in_aggregation_proofs( "slot" => slot, "subnet_id" => %subnet_id, ); - Some(((slot, subnet_id), proof)) - } else { + Some(((slot, *subnet_id), proof)) + } + Ok(false) => None, + Err(e) => { + warn!( + log, + "Error determining is_aggregator"; + "pubkey" => ?duty.pubkey, + "slot" => slot, + "error" => ?e, + ); None } - }) - .collect(); + } + }); - Some((duty.validator_index, proofs)) - }) - .collect(); + // Execute all the futures in parallel, collecting any successful results. + let proofs = join_all(futures) + .await + .into_iter() + .flatten() + .collect::>(); + + validator_proofs.push((duty.validator_index, proofs)); + } // Add to global storage (we add regularly so the proofs can be used ASAP). let sync_map = duties_service.sync_duties.committees.read(); diff --git a/validator_client/src/http_api/create_validator.rs b/validator_client/src/http_api/create_validator.rs index c2b50c864..3c4901e61 100644 --- a/validator_client/src/http_api/create_validator.rs +++ b/validator_client/src/http_api/create_validator.rs @@ -1,4 +1,5 @@ use crate::ValidatorStore; +use account_utils::validator_definitions::{SigningDefinition, ValidatorDefinition}; use account_utils::{ eth2_wallet::{bip39::Mnemonic, WalletBuilder}, random_mnemonic, random_password, ZeroizeString, @@ -21,7 +22,7 @@ use validator_dir::Builder as ValidatorDirBuilder; /// /// If `key_derivation_path_offset` is supplied then the EIP-2334 validator index will start at /// this point. -pub async fn create_validators, T: 'static + SlotClock, E: EthSpec>( +pub async fn create_validators_mnemonic, T: 'static + SlotClock, E: EthSpec>( mnemonic_opt: Option, key_derivation_path_offset: Option, validator_requests: &[api_types::ValidatorRequest], @@ -159,3 +160,33 @@ pub async fn create_validators, T: 'static + SlotClock, E: EthSpe Ok((validators, mnemonic)) } + +pub async fn create_validators_web3signer( + validator_requests: &[api_types::Web3SignerValidatorRequest], + validator_store: &ValidatorStore, +) -> Result<(), warp::Rejection> { + for request in validator_requests { + let validator_definition = ValidatorDefinition { + enabled: request.enable, + voting_public_key: request.voting_public_key.clone(), + graffiti: request.graffiti.clone(), + description: request.description.clone(), + signing_definition: SigningDefinition::Web3Signer { + url: request.url.clone(), + root_certificate_path: request.root_certificate_path.clone(), + request_timeout_ms: request.request_timeout_ms, + }, + }; + validator_store + .add_validator(validator_definition) + .await + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to initialize validator: {:?}", + e + )) + })?; + } + + Ok(()) +} diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index f07ad8952..5e0f3443a 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -4,7 +4,7 @@ mod tests; use crate::ValidatorStore; use account_utils::mnemonic_from_phrase; -use create_validator::create_validators; +use create_validator::{create_validators_mnemonic, create_validators_web3signer}; use eth2::lighthouse_vc::types::{self as api_types, PublicKey, PublicKeyBytes}; use lighthouse_version::version_with_platform; use serde::{Deserialize, Serialize}; @@ -273,14 +273,15 @@ pub fn serve( runtime: Weak| { blocking_signed_json_task(signer, move || { if let Some(runtime) = runtime.upgrade() { - let (validators, mnemonic) = runtime.block_on(create_validators( - None, - None, - &body, - &validator_dir, - &validator_store, - &spec, - ))?; + let (validators, mnemonic) = + runtime.block_on(create_validators_mnemonic( + None, + None, + &body, + &validator_dir, + &validator_store, + &spec, + ))?; let response = api_types::PostValidatorsResponseData { mnemonic: mnemonic.into_phrase().into(), validators, @@ -322,14 +323,15 @@ pub fn serve( e )) })?; - let (validators, _mnemonic) = runtime.block_on(create_validators( - Some(mnemonic), - Some(body.key_derivation_path_offset), - &body.validators, - &validator_dir, - &validator_store, - &spec, - ))?; + let (validators, _mnemonic) = + runtime.block_on(create_validators_mnemonic( + Some(mnemonic), + Some(body.key_derivation_path_offset), + &body.validators, + &validator_dir, + &validator_store, + &spec, + ))?; Ok(api_types::GenericResponse::from(validators)) } else { Err(warp_utils::reject::custom_server_error( @@ -416,6 +418,33 @@ pub fn serve( }, ); + // POST lighthouse/validators/web3signer + let post_validators_web3signer = warp::path("lighthouse") + .and(warp::path("validators")) + .and(warp::path("web3signer")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(validator_store_filter.clone()) + .and(signer.clone()) + .and(runtime_filter.clone()) + .and_then( + |body: Vec, + validator_store: Arc>, + signer, + runtime: Weak| { + blocking_signed_json_task(signer, move || { + if let Some(runtime) = runtime.upgrade() { + runtime.block_on(create_validators_web3signer(&body, &validator_store))?; + Ok(()) + } else { + Err(warp_utils::reject::custom_server_error( + "Runtime shutdown".into(), + )) + } + }) + }, + ); + // PATCH lighthouse/validators/{validator_pubkey} let patch_validators = warp::path("lighthouse") .and(warp::path("validators")) @@ -484,7 +513,8 @@ pub fn serve( .or(warp::post().and( post_validators .or(post_validators_keystore) - .or(post_validators_mnemonic), + .or(post_validators_mnemonic) + .or(post_validators_web3signer), )) .or(warp::patch().and(patch_validators)), ) diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index ebb011671..c9ef869be 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -4,7 +4,8 @@ use crate::doppelganger_service::DoppelgangerService; use crate::{ http_api::{ApiSecret, Config as HttpConfig, Context}, - Config, InitializedValidators, ValidatorDefinitions, ValidatorStore, + initialized_validators::InitializedValidators, + Config, ValidatorDefinitions, ValidatorStore, }; use account_utils::{ eth2_wallet::WalletBuilder, mnemonic_from_phrase, random_mnemonic, random_password, @@ -27,6 +28,7 @@ use std::marker::PhantomData; use std::net::Ipv4Addr; use std::sync::Arc; use std::time::Duration; +use task_executor::TaskExecutor; use tempfile::{tempdir, TempDir}; use tokio::runtime::Runtime; use tokio::sync::oneshot; @@ -41,6 +43,7 @@ struct ApiTester { url: SensitiveUrl, _server_shutdown: oneshot::Sender<()>, _validator_dir: TempDir, + _runtime_shutdown: exit_future::Signal, } // Builds a runtime to be used in the testing configuration. @@ -85,6 +88,10 @@ impl ApiTester { let slot_clock = TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1)); + let (runtime_shutdown, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let executor = TaskExecutor::new(runtime.clone(), exit, log.clone(), shutdown_tx); + let validator_store = ValidatorStore::<_, E>::new( initialized_validators, slashing_protection, @@ -92,6 +99,7 @@ impl ApiTester { spec, Some(Arc::new(DoppelgangerService::new(log.clone()))), slot_clock, + executor, log.clone(), ); @@ -141,6 +149,7 @@ impl ApiTester { client, url, _server_shutdown: shutdown_tx, + _runtime_shutdown: runtime_shutdown, } } @@ -425,6 +434,40 @@ impl ApiTester { self } + pub async fn create_web3signer_validators(self, s: Web3SignerValidatorScenario) -> Self { + let initial_vals = self.vals_total(); + let initial_enabled_vals = self.vals_enabled(); + + let request: Vec<_> = (0..s.count) + .map(|i| { + let kp = Keypair::random(); + Web3SignerValidatorRequest { + enable: s.enabled, + description: format!("{}", i), + graffiti: None, + voting_public_key: kp.pk, + url: format!("http://signer_{}.com/", i), + root_certificate_path: None, + request_timeout_ms: None, + } + }) + .collect(); + + self.client + .post_lighthouse_validators_web3signer(&request) + .await + .unwrap_err(); + + assert_eq!(self.vals_total(), initial_vals + s.count); + if s.enabled { + assert_eq!(self.vals_enabled(), initial_enabled_vals + s.count); + } else { + assert_eq!(self.vals_enabled(), initial_enabled_vals); + }; + + self + } + pub async fn set_validator_enabled(self, index: usize, enabled: bool) -> Self { let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index]; @@ -480,6 +523,11 @@ struct KeystoreValidatorScenario { correct_password: bool, } +struct Web3SignerValidatorScenario { + count: usize, + enabled: bool, +} + #[test] fn invalid_pubkey() { let runtime = build_runtime(); @@ -677,3 +725,22 @@ fn keystore_validator_creation() { .assert_validators_count(2); }); } + +#[test] +fn web3signer_validator_creation() { + let runtime = build_runtime(); + let weak_runtime = Arc::downgrade(&runtime); + runtime.block_on(async { + ApiTester::new(weak_runtime) + .await + .assert_enabled_validators_count(0) + .assert_validators_count(0) + .create_web3signer_validators(Web3SignerValidatorScenario { + count: 1, + enabled: true, + }) + .await + .assert_enabled_validators_count(1) + .assert_validators_count(1); + }); +} diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index db8eb8212..29e52c387 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -30,6 +30,8 @@ pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get"; pub const SUBSCRIPTIONS_HTTP_POST: &str = "subscriptions_http_post"; pub const UPDATE_PROPOSERS: &str = "update_proposers"; pub const SUBSCRIPTIONS: &str = "subscriptions"; +pub const LOCAL_KEYSTORE: &str = "local_keystore"; +pub const WEB3SIGNER: &str = "web3signer"; pub use lighthouse_metrics::*; @@ -138,6 +140,14 @@ lazy_static::lazy_static! { "sync_eth2_fallback_connected", "Set to 1 if connected to atleast one synced eth2 fallback node, otherwise set to 0", ); + /* + * Signing Metrics + */ + pub static ref SIGNING_TIMES: Result = try_create_histogram_vec( + "vc_signing_times_seconds", + "Duration to obtain a signature", + &["type"] + ); } pub fn gather_prometheus_metrics( diff --git a/validator_client/src/initialized_validators.rs b/validator_client/src/initialized_validators.rs index 4e9bbef76..57585e267 100644 --- a/validator_client/src/initialized_validators.rs +++ b/validator_client/src/initialized_validators.rs @@ -6,6 +6,7 @@ //! The `InitializedValidators` struct in this file serves as the source-of-truth of which //! validators are managed by this validator client. +use crate::signing_method::SigningMethod; use account_utils::{ read_password, read_password_from_user, validator_definitions::{ @@ -16,16 +17,26 @@ use account_utils::{ use eth2_keystore::Keystore; use lighthouse_metrics::set_gauge; use lockfile::{Lockfile, LockfileError}; +use reqwest::{Certificate, Client, Error as ReqwestError}; use slog::{debug, error, info, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::fs::File; -use std::io; +use std::io::{self, Read}; use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; use types::{Graffiti, Keypair, PublicKey, PublicKeyBytes}; +use url::{ParseError, Url}; use crate::key_cache; use crate::key_cache::KeyCache; +/// Default timeout for a request to a remote signer for a signature. +/// +/// Set to 12 seconds since that's the duration of a slot. A remote signer that cannot sign within +/// that time is outside the synchronous assumptions of Eth2. +const DEFAULT_REMOTE_SIGNER_REQUEST_TIMEOUT: Duration = Duration::from_secs(12); + // Use TTY instead of stdin to capture passwords from users. const USE_STDIN: bool = false; @@ -66,6 +77,12 @@ pub enum Error { ValidatorNotInitialized(PublicKey), /// Unable to read the slot clock. SlotClock, + /// The URL for the remote signer cannot be parsed. + InvalidWeb3SignerUrl(String), + /// Unable to read the root certificate file for the remote signer. + InvalidWeb3SignerRootCertificateFile(io::Error), + InvalidWeb3SignerRootCertificate(ReqwestError), + UnableToBuildWeb3SignerClient(ReqwestError), } impl From for Error { @@ -74,23 +91,9 @@ impl From for Error { } } -/// A method used by a validator to sign messages. -/// -/// Presently there is only a single variant, however we expect more variants to arise (e.g., -/// remote signing). -pub enum SigningMethod { - /// A validator that is defined by an EIP-2335 keystore on the local filesystem. - LocalKeystore { - voting_keystore_path: PathBuf, - voting_keystore_lockfile: Lockfile, - voting_keystore: Keystore, - voting_keypair: Keypair, - }, -} - /// A validator that is ready to sign messages. pub struct InitializedValidator { - signing_method: SigningMethod, + signing_method: Arc, graffiti: Option, /// The validators index in `state.validators`, to be updated by an external service. index: Option, @@ -99,11 +102,13 @@ pub struct InitializedValidator { impl InitializedValidator { /// Return a reference to this validator's lockfile if it has one. pub fn keystore_lockfile(&self) -> Option<&Lockfile> { - match self.signing_method { + match self.signing_method.as_ref() { SigningMethod::LocalKeystore { ref voting_keystore_lockfile, .. } => Some(voting_keystore_lockfile), + // Web3Signer validators do not have any lockfiles. + SigningMethod::Web3Signer { .. } => None, } } } @@ -138,7 +143,7 @@ impl InitializedValidator { return Err(Error::UnableToInitializeDisabledValidator); } - match def.signing_definition { + let signing_method = match def.signing_definition { // Load the keystore, password, decrypt the keypair and create a lockfile for a // EIP-2335 keystore on the local filesystem. SigningDefinition::LocalKeystore { @@ -210,33 +215,77 @@ impl InitializedValidator { let voting_keystore_lockfile = Lockfile::new(lockfile_path)?; - Ok(Self { - signing_method: SigningMethod::LocalKeystore { - voting_keystore_path, - voting_keystore_lockfile, - voting_keystore: voting_keystore.clone(), - voting_keypair, - }, - graffiti: def.graffiti.map(Into::into), - index: None, - }) + SigningMethod::LocalKeystore { + voting_keystore_path, + voting_keystore_lockfile, + voting_keystore: voting_keystore.clone(), + voting_keypair: Arc::new(voting_keypair), + } } - } + SigningDefinition::Web3Signer { + url, + root_certificate_path, + request_timeout_ms, + } => { + let signing_url = build_web3_signer_url(&url, &def.voting_public_key) + .map_err(|e| Error::InvalidWeb3SignerUrl(e.to_string()))?; + let request_timeout = request_timeout_ms + .map(Duration::from_millis) + .unwrap_or(DEFAULT_REMOTE_SIGNER_REQUEST_TIMEOUT); + + let builder = Client::builder().timeout(request_timeout); + + let builder = if let Some(path) = root_certificate_path { + let certificate = load_pem_certificate(path)?; + builder.add_root_certificate(certificate) + } else { + builder + }; + + let http_client = builder + .build() + .map_err(Error::UnableToBuildWeb3SignerClient)?; + + SigningMethod::Web3Signer { + signing_url, + http_client, + voting_public_key: def.voting_public_key, + } + } + }; + + Ok(Self { + signing_method: Arc::new(signing_method), + graffiti: def.graffiti.map(Into::into), + index: None, + }) } /// Returns the voting public key for this validator. pub fn voting_public_key(&self) -> &PublicKey { - match &self.signing_method { + match self.signing_method.as_ref() { SigningMethod::LocalKeystore { voting_keypair, .. } => &voting_keypair.pk, + SigningMethod::Web3Signer { + voting_public_key, .. + } => voting_public_key, } } +} - /// Returns the voting keypair for this validator. - pub fn voting_keypair(&self) -> &Keypair { - match &self.signing_method { - SigningMethod::LocalKeystore { voting_keypair, .. } => voting_keypair, - } - } +pub fn load_pem_certificate>(pem_path: P) -> Result { + let mut buf = Vec::new(); + File::open(&pem_path) + .map_err(Error::InvalidWeb3SignerRootCertificateFile)? + .read_to_end(&mut buf) + .map_err(Error::InvalidWeb3SignerRootCertificateFile)?; + Certificate::from_pem(&buf).map_err(Error::InvalidWeb3SignerRootCertificate) +} + +fn build_web3_signer_url(base_url: &str, voting_public_key: &PublicKey) -> Result { + Url::parse(base_url)?.join(&format!( + "api/v1/eth2/sign/{}", + voting_public_key.to_string() + )) } /// Try to unlock `keystore` at `keystore_path` by prompting the user via `stdin`. @@ -325,12 +374,14 @@ impl InitializedValidators { self.validators.iter().map(|(pubkey, _)| pubkey) } - /// Returns the voting `Keypair` for a given voting `PublicKey`, if that validator is known to - /// `self` **and** the validator is enabled. - pub fn voting_keypair(&self, voting_public_key: &PublicKeyBytes) -> Option<&Keypair> { + /// Returns the voting `Keypair` for a given voting `PublicKey`, if all are true: + /// + /// - The validator is known to `self`. + /// - The validator is enabled. + pub fn signing_method(&self, voting_public_key: &PublicKeyBytes) -> Option> { self.validators .get(voting_public_key) - .map(|v| v.voting_keypair()) + .map(|v| v.signing_method.clone()) } /// Add a validator definition to `self`, overwriting the on-disk representation of `self`. @@ -431,6 +482,8 @@ impl InitializedValidators { }; definitions_map.insert(*key_store.uuid(), def); } + // Remote signer validators don't interact with the key cache. + SigningDefinition::Web3Signer { .. } => (), } } @@ -451,13 +504,13 @@ impl InitializedValidators { let mut public_keys = Vec::new(); for uuid in cache.uuids() { let def = definitions_map.get(uuid).expect("Existence checked before"); - let pw = match &def.signing_definition { + match &def.signing_definition { SigningDefinition::LocalKeystore { voting_keystore_password_path, voting_keystore_password, voting_keystore_path, } => { - if let Some(p) = voting_keystore_password { + let pw = if let Some(p) = voting_keystore_password { p.as_ref().to_vec().into() } else if let Some(path) = voting_keystore_password_path { read_password(path).map_err(Error::UnableToReadVotingKeystorePassword)? @@ -468,11 +521,13 @@ impl InitializedValidators { .as_ref() .to_vec() .into() - } + }; + passwords.push(pw); + public_keys.push(def.voting_public_key.clone()); } + // Remote signer validators don't interact with the key cache. + SigningDefinition::Web3Signer { .. } => (), }; - passwords.push(pw); - public_keys.push(def.voting_public_key.clone()); } //decrypt @@ -546,6 +601,7 @@ impl InitializedValidators { info!( self.log, "Enabled validator"; + "signing_method" => "local_keystore", "voting_pubkey" => format!("{:?}", def.voting_public_key), ); @@ -565,6 +621,40 @@ impl InitializedValidators { self.log, "Failed to initialize validator"; "error" => format!("{:?}", e), + "signing_method" => "local_keystore", + "validator" => format!("{:?}", def.voting_public_key) + ); + + // Exit on an invalid validator. + return Err(e); + } + } + } + SigningDefinition::Web3Signer { .. } => { + match InitializedValidator::from_definition( + def.clone(), + &mut key_cache, + &mut key_stores, + ) + .await + { + Ok(init) => { + self.validators + .insert(init.voting_public_key().compress(), init); + + info!( + self.log, + "Enabled validator"; + "signing_method" => "remote_signer", + "voting_pubkey" => format!("{:?}", def.voting_public_key), + ); + } + Err(e) => { + error!( + self.log, + "Failed to initialize validator"; + "error" => format!("{:?}", e), + "signing_method" => "remote_signer", "validator" => format!("{:?}", def.voting_public_key) ); @@ -585,6 +675,8 @@ impl InitializedValidators { disabled_uuids.insert(*key_store.uuid()); } } + // Remote signers do not interact with the key cache. + SigningDefinition::Web3Signer { .. } => (), } info!( diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 32b5bab8c..ec7238427 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -7,19 +7,22 @@ mod config; mod duties_service; mod graffiti_file; mod http_metrics; -mod initialized_validators; mod key_cache; mod notifier; +mod signing_method; mod sync_committee_service; -mod validator_store; mod doppelganger_service; pub mod http_api; +pub mod initialized_validators; +pub mod validator_store; pub use cli::cli_app; pub use config::Config; +use initialized_validators::InitializedValidators; use lighthouse_metrics::set_gauge; use monitoring_api::{MonitoringHttpClient, ProcessType}; +pub use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME}; use crate::beacon_node_fallback::{ start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced, @@ -33,10 +36,8 @@ use duties_service::DutiesService; use environment::RuntimeContext; use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts}; use http_api::ApiSecret; -use initialized_validators::InitializedValidators; use notifier::spawn_notifier; use parking_lot::RwLock; -use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME}; use slog::{error, info, warn, Logger}; use slot_clock::SlotClock; use slot_clock::SystemTimeSlotClock; @@ -332,6 +333,7 @@ impl ProductionValidatorClient { context.eth2_config.spec.clone(), doppelganger_service.clone(), slot_clock.clone(), + context.executor.clone(), log.clone(), )); diff --git a/validator_client/src/signing_method.rs b/validator_client/src/signing_method.rs new file mode 100644 index 000000000..561cda161 --- /dev/null +++ b/validator_client/src/signing_method.rs @@ -0,0 +1,222 @@ +//! Provides methods for obtaining validator signatures, including: +//! +//! - Via a local `Keypair`. +//! - Via a remote signer (Web3Signer) + +use crate::http_metrics::metrics; +use eth2_keystore::Keystore; +use lockfile::Lockfile; +use reqwest::Client; +use std::path::PathBuf; +use std::sync::Arc; +use task_executor::TaskExecutor; +use types::*; +use url::Url; +use web3signer::{ForkInfo, SigningRequest, SigningResponse}; + +pub use web3signer::Web3SignerObject; + +mod web3signer; + +#[derive(Debug, PartialEq)] +pub enum Error { + InconsistentDomains { + message_type_domain: Domain, + domain: Domain, + }, + Web3SignerRequestFailed(String), + Web3SignerJsonParsingFailed(String), + ShuttingDown, + TokioJoin(String), +} + +/// Enumerates all messages that can be signed by a validator. +pub enum SignableMessage<'a, T: EthSpec> { + RandaoReveal(Epoch), + BeaconBlock(&'a BeaconBlock), + AttestationData(&'a AttestationData), + SignedAggregateAndProof(&'a AggregateAndProof), + SelectionProof(Slot), + SyncSelectionProof(&'a SyncAggregatorSelectionData), + SyncCommitteeSignature { + beacon_block_root: Hash256, + slot: Slot, + }, + SignedContributionAndProof(&'a ContributionAndProof), +} + +impl<'a, T: EthSpec> SignableMessage<'a, T> { + /// Returns the `SignedRoot` for the contained message. + /// + /// The actual `SignedRoot` trait is not used since it also requires a `TreeHash` impl, which is + /// not required here. + pub fn signing_root(&self, domain: Hash256) -> Hash256 { + match self { + SignableMessage::RandaoReveal(epoch) => epoch.signing_root(domain), + SignableMessage::BeaconBlock(b) => b.signing_root(domain), + SignableMessage::AttestationData(a) => a.signing_root(domain), + SignableMessage::SignedAggregateAndProof(a) => a.signing_root(domain), + SignableMessage::SelectionProof(slot) => slot.signing_root(domain), + SignableMessage::SyncSelectionProof(s) => s.signing_root(domain), + SignableMessage::SyncCommitteeSignature { + beacon_block_root, .. + } => beacon_block_root.signing_root(domain), + SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain), + } + } +} + +/// A method used by a validator to sign messages. +/// +/// Presently there is only a single variant, however we expect more variants to arise (e.g., +/// remote signing). +pub enum SigningMethod { + /// A validator that is defined by an EIP-2335 keystore on the local filesystem. + LocalKeystore { + voting_keystore_path: PathBuf, + voting_keystore_lockfile: Lockfile, + voting_keystore: Keystore, + voting_keypair: Arc, + }, + /// A validator that defers to a Web3Signer server for signing. + /// + /// See: https://docs.web3signer.consensys.net/en/latest/ + Web3Signer { + signing_url: Url, + http_client: Client, + voting_public_key: PublicKey, + }, +} + +/// The additional information used to construct a signature. Mostly used for protection from replay +/// attacks. +pub struct SigningContext { + pub domain: Domain, + pub epoch: Epoch, + pub fork: Fork, + pub genesis_validators_root: Hash256, +} + +impl SigningContext { + /// Returns the `Hash256` to be mixed-in with the signature. + pub fn domain_hash(&self, spec: &ChainSpec) -> Hash256 { + spec.get_domain( + self.epoch, + self.domain, + &self.fork, + self.genesis_validators_root, + ) + } +} + +impl SigningMethod { + /// Return the signature of `signable_message`, with respect to the `signing_context`. + pub async fn get_signature( + &self, + signable_message: SignableMessage<'_, T>, + signing_context: SigningContext, + spec: &ChainSpec, + executor: &TaskExecutor, + ) -> Result { + let domain_hash = signing_context.domain_hash(spec); + let SigningContext { + fork, + genesis_validators_root, + .. + } = signing_context; + + let signing_root = signable_message.signing_root(domain_hash); + + match self { + SigningMethod::LocalKeystore { voting_keypair, .. } => { + let _timer = + metrics::start_timer_vec(&metrics::SIGNING_TIMES, &[metrics::LOCAL_KEYSTORE]); + + let voting_keypair = voting_keypair.clone(); + // Spawn a blocking task to produce the signature. This avoids blocking the core + // tokio executor. + let signature = executor + .spawn_blocking_handle( + move || voting_keypair.sk.sign(signing_root), + "local_keystore_signer", + ) + .ok_or(Error::ShuttingDown)? + .await + .map_err(|e| Error::TokioJoin(e.to_string()))?; + Ok(signature) + } + SigningMethod::Web3Signer { + signing_url, + http_client, + .. + } => { + let _timer = + metrics::start_timer_vec(&metrics::SIGNING_TIMES, &[metrics::WEB3SIGNER]); + + // Map the message into a Web3Signer type. + let object = match signable_message { + SignableMessage::RandaoReveal(epoch) => { + Web3SignerObject::RandaoReveal { epoch } + } + SignableMessage::BeaconBlock(block) => Web3SignerObject::beacon_block(block), + SignableMessage::AttestationData(a) => Web3SignerObject::Attestation(a), + SignableMessage::SignedAggregateAndProof(a) => { + Web3SignerObject::AggregateAndProof(a) + } + SignableMessage::SelectionProof(slot) => { + Web3SignerObject::AggregationSlot { slot } + } + SignableMessage::SyncSelectionProof(s) => { + Web3SignerObject::SyncAggregatorSelectionData(s) + } + SignableMessage::SyncCommitteeSignature { + beacon_block_root, + slot, + } => Web3SignerObject::SyncCommitteeMessage { + beacon_block_root, + slot, + }, + SignableMessage::SignedContributionAndProof(c) => { + Web3SignerObject::ContributionAndProof(c) + } + }; + + // Determine the Web3Signer message type. + let message_type = object.message_type(); + + // The `fork_info` field is not required for deposits since they sign across the + // genesis fork version. + let fork_info = if let Web3SignerObject::Deposit { .. } = &object { + None + } else { + Some(ForkInfo { + fork, + genesis_validators_root, + }) + }; + + let request = SigningRequest { + message_type, + fork_info, + signing_root, + object, + }; + + // Request a signature from the Web3Signer instance via HTTP(S). + let response: SigningResponse = http_client + .post(signing_url.clone()) + .json(&request) + .send() + .await + .map_err(|e| Error::Web3SignerRequestFailed(e.to_string()))? + .error_for_status() + .map_err(|e| Error::Web3SignerRequestFailed(e.to_string()))? + .json() + .await + .map_err(|e| Error::Web3SignerJsonParsingFailed(e.to_string()))?; + + Ok(response.signature) + } + } + } +} diff --git a/validator_client/src/signing_method/web3signer.rs b/validator_client/src/signing_method/web3signer.rs new file mode 100644 index 000000000..6ffe2a1ee --- /dev/null +++ b/validator_client/src/signing_method/web3signer.rs @@ -0,0 +1,114 @@ +//! Contains the types required to make JSON requests to Web3Signer servers. + +use serde::{Deserialize, Serialize}; +use types::*; + +#[derive(Debug, PartialEq, Copy, Clone, Serialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum MessageType { + AggregationSlot, + AggregateAndProof, + Attestation, + BlockV2, + Deposit, + RandaoReveal, + VoluntaryExit, + SyncCommitteeMessage, + SyncCommitteeSelectionProof, + SyncCommitteeContributionAndProof, +} + +#[derive(Debug, PartialEq, Copy, Clone, Serialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum ForkName { + Phase0, + Altair, +} + +#[derive(Debug, PartialEq, Serialize)] +pub struct ForkInfo { + pub fork: Fork, + pub genesis_validators_root: Hash256, +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(bound = "T: EthSpec", rename_all = "snake_case")] +pub enum Web3SignerObject<'a, T: EthSpec> { + AggregationSlot { + slot: Slot, + }, + AggregateAndProof(&'a AggregateAndProof), + Attestation(&'a AttestationData), + BeaconBlock { + version: ForkName, + block: &'a BeaconBlock, + }, + #[allow(dead_code)] + Deposit { + pubkey: PublicKeyBytes, + withdrawal_credentials: Hash256, + #[serde(with = "eth2_serde_utils::quoted_u64")] + amount: u64, + #[serde(with = "eth2_serde_utils::bytes_4_hex")] + genesis_fork_version: [u8; 4], + }, + RandaoReveal { + epoch: Epoch, + }, + #[allow(dead_code)] + VoluntaryExit(&'a VoluntaryExit), + SyncCommitteeMessage { + beacon_block_root: Hash256, + slot: Slot, + }, + SyncAggregatorSelectionData(&'a SyncAggregatorSelectionData), + ContributionAndProof(&'a ContributionAndProof), +} + +impl<'a, T: EthSpec> Web3SignerObject<'a, T> { + pub fn beacon_block(block: &'a BeaconBlock) -> Self { + let version = match block { + BeaconBlock::Base(_) => ForkName::Phase0, + BeaconBlock::Altair(_) => ForkName::Altair, + }; + + Web3SignerObject::BeaconBlock { version, block } + } + + pub fn message_type(&self) -> MessageType { + match self { + Web3SignerObject::AggregationSlot { .. } => MessageType::AggregationSlot, + Web3SignerObject::AggregateAndProof(_) => MessageType::AggregateAndProof, + Web3SignerObject::Attestation(_) => MessageType::Attestation, + Web3SignerObject::BeaconBlock { .. } => MessageType::BlockV2, + Web3SignerObject::Deposit { .. } => MessageType::Deposit, + Web3SignerObject::RandaoReveal { .. } => MessageType::RandaoReveal, + Web3SignerObject::VoluntaryExit(_) => MessageType::VoluntaryExit, + Web3SignerObject::SyncCommitteeMessage { .. } => MessageType::SyncCommitteeMessage, + Web3SignerObject::SyncAggregatorSelectionData(_) => { + MessageType::SyncCommitteeSelectionProof + } + Web3SignerObject::ContributionAndProof(_) => { + MessageType::SyncCommitteeContributionAndProof + } + } + } +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(bound = "T: EthSpec")] +pub struct SigningRequest<'a, T: EthSpec> { + #[serde(rename = "type")] + pub message_type: MessageType, + #[serde(skip_serializing_if = "Option::is_none")] + pub fork_info: Option, + #[serde(rename = "signingRoot")] + pub signing_root: Hash256, + #[serde(flatten)] + pub object: Web3SignerObject<'a, T>, +} + +#[derive(Debug, PartialEq, Deserialize)] +pub struct SigningResponse { + pub signature: Signature, +} diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs index 33110ed2e..a349cb75f 100644 --- a/validator_client/src/sync_committee_service.rs +++ b/validator_client/src/sync_committee_service.rs @@ -2,6 +2,7 @@ use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; use environment::RuntimeContext; use eth2::types::BlockId; +use futures::future::join_all; use futures::future::FutureExt; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; @@ -182,23 +183,31 @@ impl SyncCommitteeService { // Spawn one task to publish all of the sync committee signatures. let validator_duties = slot_duties.duties; + let service = self.clone(); self.inner.context.executor.spawn( - self.clone() - .publish_sync_committee_signatures(slot, block_root, validator_duties) - .map(|_| ()), + async move { + service + .publish_sync_committee_signatures(slot, block_root, validator_duties) + .map(|_| ()) + .await + }, "sync_committee_signature_publish", ); let aggregators = slot_duties.aggregators; + let service = self.clone(); self.inner.context.executor.spawn( - self.clone() - .publish_sync_committee_aggregates( - slot, - block_root, - aggregators, - aggregate_production_instant, - ) - .map(|_| ()), + async move { + service + .publish_sync_committee_aggregates( + slot, + block_root, + aggregators, + aggregate_production_instant, + ) + .map(|_| ()) + .await + }, "sync_committee_aggregate_publish", ); @@ -207,42 +216,50 @@ impl SyncCommitteeService { /// Publish sync committee signatures. async fn publish_sync_committee_signatures( - self, + &self, slot: Slot, beacon_block_root: Hash256, validator_duties: Vec, ) -> Result<(), ()> { - let log = self.context.log().clone(); + let log = self.context.log(); - let committee_signatures = validator_duties - .iter() - .filter_map(|duty| { - self.validator_store - .produce_sync_committee_signature( - slot, - beacon_block_root, - duty.validator_index, - &duty.pubkey, - ) - .map_err(|e| { - crit!( - log, - "Failed to sign sync committee signature"; - "validator_index" => duty.validator_index, - "slot" => slot, - "error" => ?e, - ); - }) - .ok() - }) + // Create futures to produce sync committee signatures. + let signature_futures = validator_duties.iter().map(|duty| async move { + match self + .validator_store + .produce_sync_committee_signature( + slot, + beacon_block_root, + duty.validator_index, + &duty.pubkey, + ) + .await + { + Ok(signature) => Some(signature), + Err(e) => { + crit!( + log, + "Failed to sign sync committee signature"; + "validator_index" => duty.validator_index, + "slot" => slot, + "error" => ?e, + ); + None + } + } + }); + + // Execute all the futures in parallel, collecting any successful results. + let committee_signatures = &join_all(signature_futures) + .await + .into_iter() + .flatten() .collect::>(); - let signatures_slice = &committee_signatures; - self.beacon_nodes .first_success(RequireSynced::No, |beacon_node| async move { beacon_node - .post_beacon_pool_sync_committee_signatures(signatures_slice) + .post_beacon_pool_sync_committee_signatures(committee_signatures) .await }) .await @@ -267,7 +284,7 @@ impl SyncCommitteeService { } async fn publish_sync_committee_aggregates( - self, + &self, slot: Slot, beacon_block_root: Hash256, aggregators: HashMap>, @@ -276,22 +293,25 @@ impl SyncCommitteeService { for (subnet_id, subnet_aggregators) in aggregators { let service = self.clone(); self.inner.context.executor.spawn( - service - .publish_sync_committee_aggregate_for_subnet( - slot, - beacon_block_root, - subnet_id, - subnet_aggregators, - aggregate_instant, - ) - .map(|_| ()), + async move { + service + .publish_sync_committee_aggregate_for_subnet( + slot, + beacon_block_root, + subnet_id, + subnet_aggregators, + aggregate_instant, + ) + .map(|_| ()) + .await + }, "sync_committee_aggregate_publish_subnet", ); } } async fn publish_sync_committee_aggregate_for_subnet( - self, + &self, slot: Slot, beacon_block_root: Hash256, subnet_id: SyncSubnetId, @@ -302,7 +322,7 @@ impl SyncCommitteeService { let log = self.context.log(); - let contribution = self + let contribution = &self .beacon_nodes .first_success(RequireSynced::No, |beacon_node| async move { let sync_contribution_data = SyncContributionData { @@ -335,35 +355,45 @@ impl SyncCommitteeService { })? .data; - // Make `SignedContributionAndProof`s - let signed_contributions = subnet_aggregators - .into_iter() - .filter_map(|(aggregator_index, aggregator_pk, selection_proof)| { - self.validator_store + // Create futures to produce signed contributions. + let signature_futures = subnet_aggregators.into_iter().map( + |(aggregator_index, aggregator_pk, selection_proof)| async move { + match self + .validator_store .produce_signed_contribution_and_proof( aggregator_index, - &aggregator_pk, + aggregator_pk, contribution.clone(), selection_proof, ) - .map_err(|e| { + .await + { + Ok(signed_contribution) => Some(signed_contribution), + Err(e) => { crit!( log, "Unable to sign sync committee contribution"; "slot" => slot, "error" => ?e, ); - }) - .ok() - }) + None + } + } + }, + ); + + // Execute all the futures in parallel, collecting any successful results. + let signed_contributions = &join_all(signature_futures) + .await + .into_iter() + .flatten() .collect::>(); // Publish to the beacon node. - let signed_contributions_slice = &signed_contributions; self.beacon_nodes .first_success(RequireSynced::No, |beacon_node| async move { beacon_node - .post_validator_contribution_and_proofs(signed_contributions_slice) + .post_validator_contribution_and_proofs(signed_contributions) .await }) .await diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 7100ee352..d7efa806a 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -1,6 +1,8 @@ use crate::{ - doppelganger_service::DoppelgangerService, http_metrics::metrics, + doppelganger_service::DoppelgangerService, + http_metrics::metrics, initialized_validators::InitializedValidators, + signing_method::{Error as SigningError, SignableMessage, SigningContext, SigningMethod}, }; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; use parking_lot::{Mutex, RwLock}; @@ -11,12 +13,13 @@ use std::iter::FromIterator; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; +use task_executor::TaskExecutor; use types::{ - attestation::Error as AttestationError, graffiti::GraffitiString, Attestation, BeaconBlock, - ChainSpec, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, - SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, - SignedContributionAndProof, SignedRoot, Slot, SyncCommitteeContribution, SyncCommitteeMessage, - SyncSelectionProof, SyncSubnetId, + attestation::Error as AttestationError, graffiti::GraffitiString, AggregateAndProof, + Attestation, BeaconBlock, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, Fork, + Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, + SignedBeaconBlock, SignedContributionAndProof, Slot, SyncAggregatorSelectionData, + SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, }; use validator_dir::ValidatorDir; @@ -32,6 +35,13 @@ pub enum Error { GreaterThanCurrentSlot { slot: Slot, current_slot: Slot }, GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch }, UnableToSignAttestation(AttestationError), + UnableToSign(SigningError), +} + +impl From for Error { + fn from(e: SigningError) -> Self { + Error::UnableToSign(e) + } } /// Number of epochs of slashing protection history to keep. @@ -73,10 +83,14 @@ pub struct ValidatorStore { log: Logger, doppelganger_service: Option>, slot_clock: T, + task_executor: TaskExecutor, _phantom: PhantomData, } impl ValidatorStore { + // All arguments are different types. Making the fields `pub` is undesired. A builder seems + // unnecessary. + #[allow(clippy::too_many_arguments)] pub fn new( validators: InitializedValidators, slashing_protection: SlashingDatabase, @@ -84,6 +98,7 @@ impl ValidatorStore { spec: ChainSpec, doppelganger_service: Option>, slot_clock: T, + task_executor: TaskExecutor, log: Logger, ) -> Self { Self { @@ -95,6 +110,7 @@ impl ValidatorStore { log, doppelganger_service, slot_clock, + task_executor, _phantom: PhantomData, } } @@ -124,12 +140,6 @@ impl ValidatorStore { /// Insert a new validator to `self`, where the validator is represented by an EIP-2335 /// keystore on the filesystem. - /// - /// This function includes: - /// - /// - Add the validator definition to the YAML file, saving it to the filesystem. - /// - Enable validator with the slashing protection database. - /// - If `enable == true`, start performing duties for the validator. pub async fn add_validator_keystore>( &self, voting_keystore_path: P, @@ -144,14 +154,28 @@ impl ValidatorStore { ) .map_err(|e| format!("failed to create validator definitions: {:?}", e))?; + validator_def.enabled = enable; + + self.add_validator(validator_def).await + } + + /// Insert a new validator to `self`. + /// + /// This function includes: + /// + /// - Adding the validator definition to the YAML file, saving it to the filesystem. + /// - Enabling the validator with the slashing protection database. + /// - If `enable == true`, starting to perform duties for the validator. + pub async fn add_validator( + &self, + validator_def: ValidatorDefinition, + ) -> Result { let validator_pubkey = validator_def.voting_public_key.compress(); self.slashing_protection .register_validator(validator_pubkey) .map_err(|e| format!("failed to register validator: {:?}", e))?; - validator_def.enabled = enable; - if let Some(doppelganger_service) = &self.doppelganger_service { doppelganger_service .register_new_validator::(validator_pubkey, &self.slot_clock)?; @@ -260,63 +284,72 @@ impl ValidatorStore { self.spec.fork_at_epoch(epoch) } - /// Runs `func`, providing it access to the `Keypair` corresponding to `validator_pubkey`. - /// - /// This forms the canonical point for accessing the secret key of some validator. It is - /// structured as a `with_...` function since we need to pass-through a read-lock in order to - /// access the keypair. - /// - /// Access to keypairs might be restricted by other internal mechanisms (e.g., doppleganger - /// protection). + /// Returns a `SigningMethod` for `validator_pubkey` *only if* that validator is considered safe + /// by doppelganger protection. + fn doppelganger_checked_signing_method( + &self, + validator_pubkey: PublicKeyBytes, + ) -> Result, Error> { + if self.doppelganger_protection_allows_signing(validator_pubkey) { + self.validators + .read() + .signing_method(&validator_pubkey) + .ok_or(Error::UnknownPubkey(validator_pubkey)) + } else { + Err(Error::DoppelgangerProtected(validator_pubkey)) + } + } + + /// Returns a `SigningMethod` for `validator_pubkey` regardless of that validators doppelganger + /// protection status. /// /// ## Warning /// - /// This function takes a read-lock on `self.validators`. To prevent deadlocks, it is advised to - /// never take any sort of concurrency lock inside this function. - fn with_validator_keypair( + /// This method should only be used for signing non-slashable messages. + fn doppelganger_bypassed_signing_method( &self, validator_pubkey: PublicKeyBytes, - func: F, - ) -> Result - where - F: FnOnce(&Keypair) -> R, - { - // If the doppelganger service is active, check to ensure it explicitly permits signing by - // this validator. - if !self.doppelganger_protection_allows_signing(validator_pubkey) { - return Err(Error::DoppelgangerProtected(validator_pubkey)); - } - - let validators_lock = self.validators.read(); - - Ok(func( - validators_lock - .voting_keypair(&validator_pubkey) - .ok_or(Error::UnknownPubkey(validator_pubkey))?, - )) + ) -> Result, Error> { + self.validators + .read() + .signing_method(&validator_pubkey) + .ok_or(Error::UnknownPubkey(validator_pubkey)) } - pub fn randao_reveal( + fn signing_context(&self, domain: Domain, signing_epoch: Epoch) -> SigningContext { + SigningContext { + domain, + epoch: signing_epoch, + fork: self.fork(signing_epoch), + genesis_validators_root: self.genesis_validators_root, + } + } + + pub async fn randao_reveal( &self, validator_pubkey: PublicKeyBytes, - epoch: Epoch, + signing_epoch: Epoch, ) -> Result { - let domain = self.spec.get_domain( - epoch, - Domain::Randao, - &self.fork(epoch), - self.genesis_validators_root, - ); - let message = epoch.signing_root(domain); + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signing_context = self.signing_context(Domain::Randao, signing_epoch); - self.with_validator_keypair(validator_pubkey, |keypair| keypair.sk.sign(message)) + let signature = signing_method + .get_signature::( + SignableMessage::RandaoReveal(signing_epoch), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + + Ok(signature) } pub fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option { self.validators.read().graffiti(validator_pubkey) } - pub fn sign_block( + pub async fn sign_block( &self, validator_pubkey: PublicKeyBytes, block: BeaconBlock, @@ -336,19 +369,15 @@ impl ValidatorStore { }); } - // Check for slashing conditions. - let fork = self.fork(block.epoch()); - let domain = self.spec.get_domain( - block.epoch(), - Domain::BeaconProposer, - &fork, - self.genesis_validators_root, - ); + let signing_epoch = block.epoch(); + let signing_context = self.signing_context(Domain::BeaconProposer, signing_epoch); + let domain_hash = signing_context.domain_hash(&self.spec); + // Check for slashing conditions. let slashing_status = self.slashing_protection.check_and_insert_block_proposal( &validator_pubkey, &block.block_header(), - domain, + domain_hash, ); match slashing_status { @@ -356,9 +385,16 @@ impl ValidatorStore { Ok(Safe::Valid) => { metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SUCCESS]); - self.with_validator_keypair(validator_pubkey, move |keypair| { - block.sign(&keypair.sk, &fork, self.genesis_validators_root, &self.spec) - }) + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signature = signing_method + .get_signature( + SignableMessage::BeaconBlock(&block), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + Ok(SignedBeaconBlock::from_block(block, signature)) } Ok(Safe::SameData) => { warn!( @@ -390,7 +426,7 @@ impl ValidatorStore { } } - pub fn sign_attestation( + pub async fn sign_attestation( &self, validator_pubkey: PublicKeyBytes, validator_committee_position: usize, @@ -406,33 +442,30 @@ impl ValidatorStore { } // Checking for slashing conditions. - let fork = self.fork(attestation.data.target.epoch); - - let domain = self.spec.get_domain( - attestation.data.target.epoch, - Domain::BeaconAttester, - &fork, - self.genesis_validators_root, - ); + let signing_epoch = attestation.data.target.epoch; + let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); + let domain_hash = signing_context.domain_hash(&self.spec); let slashing_status = self.slashing_protection.check_and_insert_attestation( &validator_pubkey, &attestation.data, - domain, + domain_hash, ); match slashing_status { // We can safely sign this attestation. Ok(Safe::Valid) => { - self.with_validator_keypair(validator_pubkey, |keypair| { - attestation.sign( - &keypair.sk, - validator_committee_position, - &fork, - self.genesis_validators_root, + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signature = signing_method + .get_signature::( + SignableMessage::AttestationData(&attestation.data), + signing_context, &self.spec, + &self.task_executor, ) - })? - .map_err(Error::UnableToSignAttestation)?; + .await?; + attestation + .add_signature(&signature, validator_committee_position) + .map_err(Error::UnableToSignAttestation)?; metrics::inc_counter_vec(&metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SUCCESS]); @@ -482,149 +515,182 @@ impl ValidatorStore { /// /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be /// modified by actors other than the signing validator. - pub fn produce_signed_aggregate_and_proof( + pub async fn produce_signed_aggregate_and_proof( &self, validator_pubkey: PublicKeyBytes, - validator_index: u64, + aggregator_index: u64, aggregate: Attestation, selection_proof: SelectionProof, ) -> Result, Error> { - let fork = self.fork(aggregate.data.target.epoch); + let signing_epoch = aggregate.data.target.epoch; + let signing_context = self.signing_context(Domain::AggregateAndProof, signing_epoch); - let proof = self.with_validator_keypair(validator_pubkey, move |keypair| { - SignedAggregateAndProof::from_aggregate( - validator_index, - aggregate, - Some(selection_proof), - &keypair.sk, - &fork, - self.genesis_validators_root, + let message = AggregateAndProof { + aggregator_index, + aggregate, + selection_proof: selection_proof.into(), + }; + + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signature = signing_method + .get_signature( + SignableMessage::SignedAggregateAndProof(&message), + signing_context, &self.spec, + &self.task_executor, ) - })?; + .await?; metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]); - Ok(proof) + Ok(SignedAggregateAndProof { message, signature }) } /// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to /// `validator_pubkey`. - pub fn produce_selection_proof( + pub async fn produce_selection_proof( &self, validator_pubkey: PublicKeyBytes, slot: Slot, ) -> Result { - // Bypass the `with_validator_keypair` function. + let signing_epoch = slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::SelectionProof, signing_epoch); + + // Bypass the `with_validator_signing_method` function. // // This is because we don't care about doppelganger protection when it comes to selection // proofs. They are not slashable and we need them to subscribe to subnets on the BN. // // As long as we disallow `SignedAggregateAndProof` then these selection proofs will never // be published on the network. - let validators_lock = self.validators.read(); - let keypair = validators_lock - .voting_keypair(&validator_pubkey) - .ok_or(Error::UnknownPubkey(validator_pubkey))?; + let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; - let proof = SelectionProof::new::( - slot, - &keypair.sk, - &self.fork(slot.epoch(E::slots_per_epoch())), - self.genesis_validators_root, - &self.spec, - ); + let signature = signing_method + .get_signature::( + SignableMessage::SelectionProof(slot), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::UnableToSign)?; metrics::inc_counter_vec(&metrics::SIGNED_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS]); - Ok(proof) + Ok(signature.into()) } /// Produce a `SyncSelectionProof` for `slot` signed by the secret key of `validator_pubkey`. - pub fn produce_sync_selection_proof( + pub async fn produce_sync_selection_proof( &self, validator_pubkey: &PublicKeyBytes, slot: Slot, subnet_id: SyncSubnetId, ) -> Result { - // Bypass `with_validator_keypair`: sync committee messages are not slashable. - let validators = self.validators.read(); - let voting_keypair = validators - .voting_keypair(validator_pubkey) - .ok_or(Error::UnknownPubkey(*validator_pubkey))?; + let signing_epoch = slot.epoch(E::slots_per_epoch()); + let signing_context = + self.signing_context(Domain::SyncCommitteeSelectionProof, signing_epoch); + + // Bypass `with_validator_signing_method`: sync committee messages are not slashable. + let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?; metrics::inc_counter_vec( &metrics::SIGNED_SYNC_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS], ); - Ok(SyncSelectionProof::new::( + let message = SyncAggregatorSelectionData { slot, - subnet_id.into(), - &voting_keypair.sk, - &self.fork(slot.epoch(E::slots_per_epoch())), - self.genesis_validators_root, - &self.spec, - )) + subcommittee_index: subnet_id.into(), + }; + + let signature = signing_method + .get_signature::( + SignableMessage::SyncSelectionProof(&message), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::UnableToSign)?; + + Ok(signature.into()) } - pub fn produce_sync_committee_signature( + pub async fn produce_sync_committee_signature( &self, slot: Slot, beacon_block_root: Hash256, validator_index: u64, validator_pubkey: &PublicKeyBytes, ) -> Result { - // Bypass `with_validator_keypair`: sync committee messages are not slashable. - let validators = self.validators.read(); - let voting_keypair = validators - .voting_keypair(validator_pubkey) - .ok_or(Error::UnknownPubkey(*validator_pubkey))?; + let signing_epoch = slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::SyncCommittee, signing_epoch); + + // Bypass `with_validator_signing_method`: sync committee messages are not slashable. + let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?; + + let signature = signing_method + .get_signature::( + SignableMessage::SyncCommitteeSignature { + beacon_block_root, + slot, + }, + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::UnableToSign)?; metrics::inc_counter_vec( &metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL, &[metrics::SUCCESS], ); - Ok(SyncCommitteeMessage::new::( + Ok(SyncCommitteeMessage { slot, beacon_block_root, validator_index, - &voting_keypair.sk, - &self.fork(slot.epoch(E::slots_per_epoch())), - self.genesis_validators_root, - &self.spec, - )) + signature, + }) } - pub fn produce_signed_contribution_and_proof( + pub async fn produce_signed_contribution_and_proof( &self, aggregator_index: u64, - aggregator_pubkey: &PublicKeyBytes, + aggregator_pubkey: PublicKeyBytes, contribution: SyncCommitteeContribution, selection_proof: SyncSelectionProof, ) -> Result, Error> { - // Bypass `with_validator_keypair`: sync committee messages are not slashable. - let validators = self.validators.read(); - let voting_keypair = validators - .voting_keypair(aggregator_pubkey) - .ok_or(Error::UnknownPubkey(*aggregator_pubkey))?; - let fork = self.fork(contribution.slot.epoch(E::slots_per_epoch())); + let signing_epoch = contribution.slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::ContributionAndProof, signing_epoch); + + // Bypass `with_validator_signing_method`: sync committee messages are not slashable. + let signing_method = self.doppelganger_bypassed_signing_method(aggregator_pubkey)?; + + let message = ContributionAndProof { + aggregator_index, + contribution, + selection_proof: selection_proof.into(), + }; + + let signature = signing_method + .get_signature( + SignableMessage::SignedContributionAndProof(&message), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::UnableToSign)?; metrics::inc_counter_vec( &metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL, &[metrics::SUCCESS], ); - Ok(SignedContributionAndProof::from_aggregate( - aggregator_index, - contribution, - Some(selection_proof), - &voting_keypair.sk, - &fork, - self.genesis_validators_root, - &self.spec, - )) + Ok(SignedContributionAndProof { message, signature }) } /// Prune the slashing protection database so that it remains performant.