From 06e310c4eb58b0c85f2ee3cc1aa1dc094c77f38d Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 19 Oct 2021 00:30:38 +0000 Subject: [PATCH] Export slashing protection per validator (#2674) ## Issue Addressed Part of https://github.com/sigp/lighthouse/issues/2557 ## Proposed Changes Refactor the slashing protection export so that it can export data for a subset of validators. This is the last remaining building block required for supporting the standard validator API (which I'll start to build atop this branch) ## Additional Info Built on and requires #2598 --- .../src/validator/slashing_protection.rs | 29 ++- .../src/extra_interchange_tests.rs | 75 ++++++++ .../slashing_protection/src/lib.rs | 1 + .../src/slashing_database.rs | 168 +++++++++++------- .../slashing_protection/src/test_utils.rs | 38 ++-- 5 files changed, 234 insertions(+), 77 deletions(-) create mode 100644 validator_client/slashing_protection/src/extra_interchange_tests.rs diff --git a/account_manager/src/validator/slashing_protection.rs b/account_manager/src/validator/slashing_protection.rs index 193b288a1..67902b7d2 100644 --- a/account_manager/src/validator/slashing_protection.rs +++ b/account_manager/src/validator/slashing_protection.rs @@ -6,7 +6,8 @@ use slashing_protection::{ }; use std::fs::File; use std::path::PathBuf; -use types::{BeaconState, Epoch, EthSpec, Slot}; +use std::str::FromStr; +use types::{BeaconState, Epoch, EthSpec, PublicKeyBytes, Slot}; pub const CMD: &str = "slashing-protection"; pub const IMPORT_CMD: &str = "import"; @@ -16,6 +17,7 @@ pub const IMPORT_FILE_ARG: &str = "IMPORT-FILE"; pub const EXPORT_FILE_ARG: &str = "EXPORT-FILE"; pub const MINIFY_FLAG: &str = "minify"; +pub const PUBKEYS_FLAG: &str = "pubkeys"; pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new(CMD) @@ -49,6 +51,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .value_name("FILE") .help("The filename to export the interchange file to"), ) + .arg( + Arg::with_name(PUBKEYS_FLAG) + .long(PUBKEYS_FLAG) + .takes_value(true) + .value_name("PUBKEYS") + .help( + "List of public keys to export history for. Keys should be 0x-prefixed, \ + comma-separated. All known keys will be exported if omitted", + ), + ) .arg( Arg::with_name(MINIFY_FLAG) .long(MINIFY_FLAG) @@ -203,6 +215,19 @@ pub fn cli_run( let export_filename: PathBuf = clap_utils::parse_required(matches, EXPORT_FILE_ARG)?; let minify: bool = clap_utils::parse_required(matches, MINIFY_FLAG)?; + let selected_pubkeys = if let Some(pubkeys) = + clap_utils::parse_optional::(matches, PUBKEYS_FLAG)? + { + let pubkeys = pubkeys + .split(',') + .map(PublicKeyBytes::from_str) + .collect::, _>>() + .map_err(|e| format!("Invalid --{} value: {:?}", PUBKEYS_FLAG, e))?; + Some(pubkeys) + } else { + None + }; + if !slashing_protection_db_path.exists() { return Err(format!( "No slashing protection database exists at: {}", @@ -220,7 +245,7 @@ pub fn cli_run( })?; let mut interchange = slashing_protection_database - .export_interchange_info(genesis_validators_root) + .export_interchange_info(genesis_validators_root, selected_pubkeys.as_deref()) .map_err(|e| format!("Error during export: {:?}", e))?; if minify { diff --git a/validator_client/slashing_protection/src/extra_interchange_tests.rs b/validator_client/slashing_protection/src/extra_interchange_tests.rs new file mode 100644 index 000000000..dd1c18821 --- /dev/null +++ b/validator_client/slashing_protection/src/extra_interchange_tests.rs @@ -0,0 +1,75 @@ +#![cfg(test)] + +use crate::test_utils::pubkey; +use crate::*; +use tempfile::tempdir; + +#[test] +fn export_non_existent_key() { + let dir = tempdir().unwrap(); + let slashing_db_file = dir.path().join("slashing_protection.sqlite"); + let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap(); + + let key1 = pubkey(1); + let key2 = pubkey(2); + + // Exporting two non-existent keys should fail on the first one. + let err = slashing_db + .export_interchange_info(Hash256::zero(), Some(&[key1, key2])) + .unwrap_err(); + assert!(matches!( + err, + InterchangeError::NotSafe(NotSafe::UnregisteredValidator(k)) if k == key1 + )); + + slashing_db.register_validator(key1).unwrap(); + + // Exporting one key that exists and one that doesn't should fail on the one that doesn't. + let err = slashing_db + .export_interchange_info(Hash256::zero(), Some(&[key1, key2])) + .unwrap_err(); + assert!(matches!( + err, + InterchangeError::NotSafe(NotSafe::UnregisteredValidator(k)) if k == key2 + )); + + // Exporting only keys that exist should work. + let interchange = slashing_db + .export_interchange_info(Hash256::zero(), Some(&[key1])) + .unwrap(); + assert_eq!(interchange.data.len(), 1); + assert_eq!(interchange.data[0].pubkey, key1); +} + +#[test] +fn export_same_key_twice() { + let dir = tempdir().unwrap(); + let slashing_db_file = dir.path().join("slashing_protection.sqlite"); + let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap(); + + let key1 = pubkey(1); + + slashing_db.register_validator(key1).unwrap(); + + let export_single = slashing_db + .export_interchange_info(Hash256::zero(), Some(&[key1])) + .unwrap(); + let export_double = slashing_db + .export_interchange_info(Hash256::zero(), Some(&[key1, key1])) + .unwrap(); + + assert_eq!(export_single.data.len(), 1); + + // Allow the same data to be exported twice, this is harmless, albeit slightly inefficient. + assert_eq!(export_double.data.len(), 2); + assert_eq!(export_double.data[0], export_double.data[1]); + + // The data should be identical to the single export. + assert_eq!(export_double.data[0], export_single.data[0]); + + // The minified versions should be equal too. + assert_eq!( + export_single.minify().unwrap(), + export_double.minify().unwrap() + ); +} diff --git a/validator_client/slashing_protection/src/lib.rs b/validator_client/slashing_protection/src/lib.rs index 02941be29..858acbfe9 100644 --- a/validator_client/slashing_protection/src/lib.rs +++ b/validator_client/slashing_protection/src/lib.rs @@ -1,5 +1,6 @@ mod attestation_tests; mod block_tests; +mod extra_interchange_tests; pub mod interchange; pub mod interchange_test; mod parallel_tests; diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index c26d22ca1..725aa6057 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -129,6 +129,19 @@ impl SlashingDatabase { Ok(()) } + /// Execute a database transaction as a closure, committing if `f` returns `Ok`. + pub fn with_transaction(&self, f: F) -> Result + where + F: FnOnce(&Transaction) -> Result, + E: From, + { + let mut conn = self.conn_pool.get().map_err(NotSafe::from)?; + let txn = conn.transaction().map_err(NotSafe::from)?; + let value = f(&txn)?; + txn.commit().map_err(NotSafe::from)?; + Ok(value) + } + /// Register a validator with the slashing protection database. /// /// This allows the validator to record their signatures in the database, and check @@ -142,11 +155,7 @@ impl SlashingDatabase { &self, public_keys: impl Iterator, ) -> Result<(), NotSafe> { - let mut conn = self.conn_pool.get()?; - let txn = conn.transaction()?; - self.register_validators_in_txn(public_keys, &txn)?; - txn.commit()?; - Ok(()) + self.with_transaction(|txn| self.register_validators_in_txn(public_keys, txn)) } /// Register multiple validators inside the given transaction. @@ -177,6 +186,23 @@ impl SlashingDatabase { .try_for_each(|public_key| self.get_validator_id_in_txn(&txn, public_key).map(|_| ())) } + /// List the internal validator ID and public key of every registered validator. + pub fn list_all_registered_validators( + &self, + txn: &Transaction, + ) -> Result, InterchangeError> { + txn.prepare("SELECT id, public_key FROM validators ORDER BY id ASC")? + .query_and_then(params![], |row| { + let validator_id = row.get(0)?; + let pubkey_str: String = row.get(1)?; + let pubkey = pubkey_str + .parse() + .map_err(InterchangeError::InvalidPubkey)?; + Ok((validator_id, pubkey)) + })? + .collect() + } + /// Get the database-internal ID for a validator. /// /// This is NOT the same as a validator index, and depends on the ordering that validators @@ -694,81 +720,101 @@ impl SlashingDatabase { } } - pub fn export_interchange_info( + pub fn export_all_interchange_info( &self, genesis_validators_root: Hash256, ) -> Result { - use std::collections::BTreeMap; + self.export_interchange_info(genesis_validators_root, None) + } + pub fn export_interchange_info( + &self, + genesis_validators_root: Hash256, + selected_pubkeys: Option<&[PublicKeyBytes]>, + ) -> Result { let mut conn = self.conn_pool.get()?; - let txn = conn.transaction()?; + let txn = &conn.transaction()?; - // Map from internal validator pubkey to blocks and attestation for that pubkey. - let mut data: BTreeMap, Vec)> = - BTreeMap::new(); - - txn.prepare( - "SELECT public_key, slot, signing_root - FROM signed_blocks, validators - WHERE signed_blocks.validator_id = validators.id - ORDER BY slot ASC", - )? - .query_and_then(params![], |row| { - let validator_pubkey: String = row.get(0)?; - let slot = row.get(1)?; - let signing_root = signing_root_from_row(2, row)?.to_hash256(); - let signed_block = InterchangeBlock { slot, signing_root }; - data.entry(validator_pubkey) - .or_insert_with(|| (vec![], vec![])) - .0 - .push(signed_block); - Ok(()) - })? - .collect::>()?; - - txn.prepare( - "SELECT public_key, source_epoch, target_epoch, signing_root - FROM signed_attestations, validators - WHERE signed_attestations.validator_id = validators.id - ORDER BY source_epoch ASC, target_epoch ASC", - )? - .query_and_then(params![], |row| { - let validator_pubkey: String = row.get(0)?; - let source_epoch = row.get(1)?; - let target_epoch = row.get(2)?; - let signing_root = signing_root_from_row(3, row)?.to_hash256(); - let signed_attestation = InterchangeAttestation { - source_epoch, - target_epoch, - signing_root, - }; - data.entry(validator_pubkey) - .or_insert_with(|| (vec![], vec![])) - .1 - .push(signed_attestation); - Ok(()) - })? - .collect::>()?; - - let metadata = InterchangeMetadata { - interchange_format_version: SUPPORTED_INTERCHANGE_FORMAT_VERSION, - genesis_validators_root, + // Determine the validator IDs and public keys to export data for. + let to_export = if let Some(selected_pubkeys) = selected_pubkeys { + selected_pubkeys + .iter() + .map(|pubkey| { + let id = self.get_validator_id_in_txn(txn, pubkey)?; + Ok((id, *pubkey)) + }) + .collect::>()? + } else { + self.list_all_registered_validators(txn)? }; - let data = data + let data = to_export .into_iter() - .map(|(pubkey, (signed_blocks, signed_attestations))| { + .map(|(validator_id, pubkey)| { + let signed_blocks = + self.export_interchange_blocks_for_validator(validator_id, txn)?; + let signed_attestations = + self.export_interchange_attestations_for_validator(validator_id, txn)?; Ok(InterchangeData { - pubkey: pubkey.parse().map_err(InterchangeError::InvalidPubkey)?, + pubkey, signed_blocks, signed_attestations, }) }) .collect::>()?; + let metadata = InterchangeMetadata { + interchange_format_version: SUPPORTED_INTERCHANGE_FORMAT_VERSION, + genesis_validators_root, + }; + Ok(Interchange { metadata, data }) } + fn export_interchange_blocks_for_validator( + &self, + validator_id: i64, + txn: &Transaction, + ) -> Result, InterchangeError> { + txn.prepare( + "SELECT slot, signing_root + FROM signed_blocks + WHERE signed_blocks.validator_id = ?1 + ORDER BY slot ASC", + )? + .query_and_then(params![validator_id], |row| { + let slot = row.get(0)?; + let signing_root = signing_root_from_row(1, row)?.to_hash256(); + Ok(InterchangeBlock { slot, signing_root }) + })? + .collect() + } + + fn export_interchange_attestations_for_validator( + &self, + validator_id: i64, + txn: &Transaction, + ) -> Result, InterchangeError> { + txn.prepare( + "SELECT source_epoch, target_epoch, signing_root + FROM signed_attestations + WHERE signed_attestations.validator_id = ?1 + ORDER BY source_epoch ASC, target_epoch ASC", + )? + .query_and_then(params![validator_id], |row| { + let source_epoch = row.get(0)?; + let target_epoch = row.get(1)?; + let signing_root = signing_root_from_row(2, row)?.to_hash256(); + let signed_attestation = InterchangeAttestation { + source_epoch, + target_epoch, + signing_root, + }; + Ok(signed_attestation) + })? + .collect() + } + /// Remove all blocks for `public_key` with slots less than `new_min_slot`. fn prune_signed_blocks( &self, diff --git a/validator_client/slashing_protection/src/test_utils.rs b/validator_client/slashing_protection/src/test_utils.rs index 8cb802378..3df892ecd 100644 --- a/validator_client/slashing_protection/src/test_utils.rs +++ b/validator_client/slashing_protection/src/test_utils.rs @@ -73,16 +73,6 @@ impl Default for StreamTest { } } -impl StreamTest { - /// The number of test cases that are expected to pass processing successfully. - fn num_expected_successes(&self) -> usize { - self.cases - .iter() - .filter(|case| case.expected.is_ok()) - .count() - } -} - impl StreamTest { pub fn run(&self) { let dir = tempdir().unwrap(); @@ -93,6 +83,8 @@ impl StreamTest { slashing_db.register_validator(*pubkey).unwrap(); } + check_registration_invariants(&slashing_db, &self.registered_validators); + for (i, test) in self.cases.iter().enumerate() { assert_eq!( slashing_db.check_and_insert_attestation(&test.pubkey, &test.data, test.domain), @@ -102,7 +94,7 @@ impl StreamTest { ); } - roundtrip_database(&dir, &slashing_db, self.num_expected_successes() == 0); + roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty()); } } @@ -116,6 +108,8 @@ impl StreamTest { slashing_db.register_validator(*pubkey).unwrap(); } + check_registration_invariants(&slashing_db, &self.registered_validators); + for (i, test) in self.cases.iter().enumerate() { assert_eq!( slashing_db.check_and_insert_block_proposal(&test.pubkey, &test.data, test.domain), @@ -125,7 +119,7 @@ impl StreamTest { ); } - roundtrip_database(&dir, &slashing_db, self.num_expected_successes() == 0); + roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty()); } } @@ -133,7 +127,7 @@ impl StreamTest { // the implicit minification done on import. fn roundtrip_database(dir: &TempDir, db: &SlashingDatabase, is_empty: bool) { let exported = db - .export_interchange_info(DEFAULT_GENESIS_VALIDATORS_ROOT) + .export_all_interchange_info(DEFAULT_GENESIS_VALIDATORS_ROOT) .unwrap(); let new_db = SlashingDatabase::create(&dir.path().join("roundtrip_slashing_protection.sqlite")).unwrap(); @@ -141,7 +135,7 @@ fn roundtrip_database(dir: &TempDir, db: &SlashingDatabase, is_empty: bool) { .import_interchange_info(exported.clone(), DEFAULT_GENESIS_VALIDATORS_ROOT) .unwrap(); let reexported = new_db - .export_interchange_info(DEFAULT_GENESIS_VALIDATORS_ROOT) + .export_all_interchange_info(DEFAULT_GENESIS_VALIDATORS_ROOT) .unwrap(); assert!(exported @@ -150,3 +144,19 @@ fn roundtrip_database(dir: &TempDir, db: &SlashingDatabase, is_empty: bool) { .equiv(&reexported.minify().unwrap())); assert_eq!(is_empty, exported.is_empty()); } + +fn check_registration_invariants( + slashing_db: &SlashingDatabase, + registered_validators: &[PublicKeyBytes], +) { + slashing_db + .check_validator_registrations(registered_validators.iter()) + .unwrap(); + let registered_list = slashing_db + .with_transaction(|txn| slashing_db.list_all_registered_validators(txn)) + .unwrap() + .into_iter() + .map(|(_, pubkey)| pubkey) + .collect::>(); + assert_eq!(registered_validators, registered_list); +}