diff --git a/validator_client/slashing_protection/src/lib.rs b/validator_client/slashing_protection/src/lib.rs index 774cbc934..06c117ab1 100644 --- a/validator_client/slashing_protection/src/lib.rs +++ b/validator_client/slashing_protection/src/lib.rs @@ -104,7 +104,8 @@ impl From for NotSafe { impl From for NotSafe { fn from(error: r2d2::Error) -> Self { - NotSafe::SQLPoolError(format!("{:?}", error)) + // Use `Display` impl to print "timed out waiting for connection" + NotSafe::SQLPoolError(format!("{}", error)) } } diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index 20e607e64..01a9210c5 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -662,15 +662,16 @@ impl SlashingDatabase { )?; } - // Prune attestations less than the min source and target from this interchange file. - // See the rationale for blocks above. - if let Some((new_min_source, new_min_target)) = record + // Prune attestations less than the min target from this interchange file. + // See the rationale for blocks above, and the doc comment for `prune_signed_attestations` + // for why we don't need to separately prune for the min source. + if let Some(new_min_target) = record .signed_attestations .iter() - .map(|attestation| (attestation.source_epoch, attestation.target_epoch)) + .map(|attestation| attestation.target_epoch) .min() { - self.prune_signed_attestations(&record.pubkey, new_min_source, new_min_target, txn)?; + self.prune_signed_attestations(&record.pubkey, new_min_target, txn)?; } let summary = self.validator_summary(&record.pubkey, txn)?; @@ -754,7 +755,7 @@ impl SlashingDatabase { } /// Remove all blocks for `public_key` with slots less than `new_min_slot`. - pub fn prune_signed_blocks( + fn prune_signed_blocks( &self, public_key: &PublicKey, new_min_slot: Slot, @@ -764,39 +765,82 @@ impl SlashingDatabase { txn.execute( "DELETE FROM signed_blocks - WHERE validator_id = ?1 AND slot < ?2", + WHERE + validator_id = ?1 AND + slot < ?2 AND + slot < (SELECT MAX(slot) + FROM signed_blocks + WHERE validator_id = ?1)", params![validator_id, new_min_slot], )?; Ok(()) } - /// Remove all attestations for `public_key` with - /// `(source, target) < (new_min_source, new_min_target)`. - pub fn prune_signed_attestations( + /// Prune the signed blocks table for the given public keys. + pub fn prune_all_signed_blocks<'a>( + &self, + mut public_keys: impl Iterator, + new_min_slot: Slot, + ) -> Result<(), NotSafe> { + let mut conn = self.conn_pool.get()?; + let txn = conn.transaction()?; + public_keys.try_for_each(|pubkey| self.prune_signed_blocks(pubkey, new_min_slot, &txn))?; + txn.commit()?; + Ok(()) + } + + /// Remove all attestations for `public_key` with `target < new_min_target`. + /// + /// Pruning every attestation with target less than `new_min_target` also has the effect of + /// making the new minimum source the source of the attestation with `target == new_min_target` + /// (if any exists). This is exactly what's required for pruning after importing an interchange + /// file, whereby we want to update the new minimum source to the min source from the + /// interchange. + /// + /// If the `new_min_target` was plucked out of thin air and doesn't necessarily correspond to + /// an extant attestation then this function is still safe. It will never delete *all* the + /// attestations in the database. + fn prune_signed_attestations( &self, public_key: &PublicKey, - new_min_source: Epoch, new_min_target: Epoch, txn: &Transaction, ) -> Result<(), NotSafe> { let validator_id = self.get_validator_id_in_txn(txn, public_key)?; - // Delete attestations with source *and* target less than the minimums. - // Assuming `(new_min_source, new_min_target)` was successfully - // inserted into the database, then any other attestation in the database - // can't have just its source or just its target less than the new minimum. - // I.e. the following holds: - // a.source < new_min_source <--> a.target < new_min_target + // The following holds: + // a.target < new_min_target --> a.source <= new_min_source + // + // The `MAX(target_epoch)` acts as a guard to prevent accidentally clearing the DB. txn.execute( "DELETE FROM signed_attestations - WHERE validator_id = ?1 AND source_epoch < ?2 AND target_epoch < ?3", - params![validator_id, new_min_source, new_min_target], + WHERE + validator_id = ?1 AND + target_epoch < ?2 AND + target_epoch < (SELECT MAX(target_epoch) + FROM signed_attestations + WHERE validator_id = ?1)", + params![validator_id, new_min_target], )?; Ok(()) } + /// Prune the signed attestations table for the given validator keys. + pub fn prune_all_signed_attestations<'a>( + &self, + mut public_keys: impl Iterator, + new_min_target: Epoch, + ) -> Result<(), NotSafe> { + let mut conn = self.conn_pool.get()?; + let txn = conn.transaction()?; + public_keys + .try_for_each(|pubkey| self.prune_signed_attestations(pubkey, new_min_target, &txn))?; + txn.commit()?; + Ok(()) + } + pub fn num_validator_rows(&self) -> Result { let mut conn = self.conn_pool.get()?; let txn = conn.transaction()?; diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 97d2496b1..1a1b8ae84 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -222,6 +222,11 @@ impl AttestationService { ); }); + // Schedule pruning of the slashing protection database once all unaggregated + // attestations have (hopefully) been signed, i.e. at the same time as aggregate + // production. + self.spawn_slashing_protection_pruning_task(slot, aggregate_production_instant); + Ok(()) } @@ -566,6 +571,32 @@ impl AttestationService { Ok(()) } + + /// Spawn a blocking task to run the slashing protection pruning process. + /// + /// Start the task at `pruning_instant` to avoid interference with other tasks. + fn spawn_slashing_protection_pruning_task(&self, slot: Slot, pruning_instant: Instant) { + let attestation_service = self.clone(); + let executor = self.inner.context.executor.clone(); + let current_epoch = slot.epoch(E::slots_per_epoch()); + + // Wait for `pruning_instant` in a regular task, and then switch to a blocking one. + self.inner.context.executor.spawn( + async move { + sleep_until(pruning_instant).await; + + executor.spawn_blocking( + move || { + attestation_service + .validator_store + .prune_slashing_protection_db(current_epoch, false) + }, + "slashing_protection_pruning", + ) + }, + "slashing_protection_pre_pruning", + ); + } } #[cfg(test)] diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index d60592f1b..5a6b993ab 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -65,6 +65,10 @@ lazy_static::lazy_static! { "Duration to perform attestation service tasks", &["task"] ); + pub static ref SLASHING_PROTECTION_PRUNE_TIMES: Result = try_create_histogram( + "vc_slashing_protection_prune_times_seconds", + "Time required to prune the slashing protection DB", + ); pub static ref BLOCK_SERVICE_TIMES: Result = try_create_histogram_vec( "vc_beacon_block_service_task_times_seconds", "Duration to perform beacon block service tasks", diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 00f48538a..edc873efe 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -277,6 +277,13 @@ impl ProductionValidatorClient { "voting_validators" => validator_store.num_voting_validators() ); + // Perform pruning of the slashing protection database on start-up. In case the database is + // oversized from having not been pruned (by a prior version) we don't want to prune + // concurrently, as it will hog the lock and cause the attestation service to spew CRITs. + if let Some(slot) = slot_clock.now() { + validator_store.prune_slashing_protection_db(slot.epoch(T::slots_per_epoch()), true); + } + let duties_service = DutiesServiceBuilder::new() .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 63b752e10..f152f411f 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -2,9 +2,9 @@ use crate::{ fork_service::ForkService, http_metrics::metrics, initialized_validators::InitializedValidators, }; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use slashing_protection::{NotSafe, Safe, SlashingDatabase}; -use slog::{crit, error, warn, Logger}; +use slog::{crit, error, info, warn, Logger}; use slot_clock::SlotClock; use std::path::Path; use std::sync::Arc; @@ -15,6 +15,11 @@ use types::{ }; use validator_dir::ValidatorDir; +/// Number of epochs of slashing protection history to keep. +/// +/// This acts as a maximum safe-guard against clock drift. +const SLASHING_PROTECTION_HISTORY_EPOCHS: u64 = 512; + struct LocalValidator { validator_dir: ValidatorDir, voting_keypair: Keypair, @@ -44,6 +49,7 @@ impl PartialEq for LocalValidator { pub struct ValidatorStore { validators: Arc>, slashing_protection: SlashingDatabase, + slashing_protection_last_prune: Arc>, genesis_validators_root: Hash256, spec: Arc, log: Logger, @@ -63,6 +69,7 @@ impl ValidatorStore { Self { validators: Arc::new(RwLock::new(validators)), slashing_protection, + slashing_protection_last_prune: Arc::new(Mutex::new(Epoch::new(0))), genesis_validators_root, spec: Arc::new(spec), log, @@ -359,4 +366,65 @@ impl ValidatorStore { &self.spec, )) } + + /// Prune the slashing protection database so that it remains performant. + /// + /// This function will only do actual pruning periodically, so it should usually be + /// cheap to call. The `first_run` flag can be used to print a more verbose message when pruning + /// runs. + pub fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool) { + // Attempt to prune every SLASHING_PROTECTION_HISTORY_EPOCHs, with a tolerance for + // missing the epoch that aligns exactly. + let mut last_prune = self.slashing_protection_last_prune.lock(); + if current_epoch / SLASHING_PROTECTION_HISTORY_EPOCHS + <= *last_prune / SLASHING_PROTECTION_HISTORY_EPOCHS + { + return; + } + + if first_run { + info!( + self.log, + "Pruning slashing protection DB"; + "epoch" => current_epoch, + "msg" => "pruning may take several minutes the first time it runs" + ); + } else { + info!(self.log, "Pruning slashing protection DB"; "epoch" => current_epoch); + } + + let _timer = metrics::start_timer(&metrics::SLASHING_PROTECTION_PRUNE_TIMES); + + let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS); + let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch()); + + let validators = self.validators.read(); + if let Err(e) = self + .slashing_protection + .prune_all_signed_attestations(validators.iter_voting_pubkeys(), new_min_target_epoch) + { + error!( + self.log, + "Error during pruning of signed attestations"; + "error" => ?e, + ); + return; + } + + if let Err(e) = self + .slashing_protection + .prune_all_signed_blocks(validators.iter_voting_pubkeys(), new_min_slot) + { + error!( + self.log, + "Error during pruning of signed blocks"; + "error" => ?e, + ); + return; + } + + *last_prune = current_epoch; + + info!(self.log, "Completed pruning of slashing protection DB"); + } }