From afd4786c59e0ae39884b8e06f76e7315d9c0b17a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 24 Feb 2021 23:51:04 +0000 Subject: [PATCH] Prune slashing protection DB (#2194) ## Proposed Changes Prune the slashing protection database so that it doesn't exhibit unbounded growth. Prune by dropping attestations and blocks from more than 512 epochs ago, relying on the guards that prevent signing messages with slots or epochs less than the minimum recorded in the DB. The pruning process is potentially time consuming, so it's scheduled to run only every 512 epochs, in the last 2/3rds of a slot. This gives it at least 4 seconds to run without impacting other signing, which I think should be sufficient. I've seen it run for several minutes (yikes!) on our Pyrmont nodes, but I suspect that 1) this will only occur on the first run when the database is still huge 2) no other production users will be impacted because they don't have enough validators per node. Pruning also happens at start-up, as I figured this is a fairly infrequent event, and if a user is experiencing problems with the VC related to pruning, it's nice to be able to trigger it with a quick restart. Users are also conditioned to not mind missing a few attestations during a restart. We need to include a note in the release notes that users may see the message `timed out waiting for connection` the first time they prune a huge database, but that this is totally fine and to be expected (the VC will miss those attestations in the meantime). I'm also open to making this opt-in for now, although the sooner we get users doing it, the less painful it will be: prune early, prune often! --- .../slashing_protection/src/lib.rs | 3 +- .../src/slashing_database.rs | 82 ++++++++++++++----- validator_client/src/attestation_service.rs | 31 +++++++ validator_client/src/http_metrics/metrics.rs | 4 + validator_client/src/lib.rs | 7 ++ validator_client/src/validator_store.rs | 72 +++++++++++++++- 6 files changed, 177 insertions(+), 22 deletions(-) diff --git a/validator_client/slashing_protection/src/lib.rs b/validator_client/slashing_protection/src/lib.rs index 774cbc934..06c117ab1 100644 --- a/validator_client/slashing_protection/src/lib.rs +++ b/validator_client/slashing_protection/src/lib.rs @@ -104,7 +104,8 @@ impl From for NotSafe { impl From for NotSafe { fn from(error: r2d2::Error) -> Self { - NotSafe::SQLPoolError(format!("{:?}", error)) + // Use `Display` impl to print "timed out waiting for connection" + NotSafe::SQLPoolError(format!("{}", error)) } } diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index 20e607e64..01a9210c5 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -662,15 +662,16 @@ impl SlashingDatabase { )?; } - // Prune attestations less than the min source and target from this interchange file. - // See the rationale for blocks above. - if let Some((new_min_source, new_min_target)) = record + // Prune attestations less than the min target from this interchange file. + // See the rationale for blocks above, and the doc comment for `prune_signed_attestations` + // for why we don't need to separately prune for the min source. + if let Some(new_min_target) = record .signed_attestations .iter() - .map(|attestation| (attestation.source_epoch, attestation.target_epoch)) + .map(|attestation| attestation.target_epoch) .min() { - self.prune_signed_attestations(&record.pubkey, new_min_source, new_min_target, txn)?; + self.prune_signed_attestations(&record.pubkey, new_min_target, txn)?; } let summary = self.validator_summary(&record.pubkey, txn)?; @@ -754,7 +755,7 @@ impl SlashingDatabase { } /// Remove all blocks for `public_key` with slots less than `new_min_slot`. - pub fn prune_signed_blocks( + fn prune_signed_blocks( &self, public_key: &PublicKey, new_min_slot: Slot, @@ -764,39 +765,82 @@ impl SlashingDatabase { txn.execute( "DELETE FROM signed_blocks - WHERE validator_id = ?1 AND slot < ?2", + WHERE + validator_id = ?1 AND + slot < ?2 AND + slot < (SELECT MAX(slot) + FROM signed_blocks + WHERE validator_id = ?1)", params![validator_id, new_min_slot], )?; Ok(()) } - /// Remove all attestations for `public_key` with - /// `(source, target) < (new_min_source, new_min_target)`. - pub fn prune_signed_attestations( + /// Prune the signed blocks table for the given public keys. + pub fn prune_all_signed_blocks<'a>( + &self, + mut public_keys: impl Iterator, + new_min_slot: Slot, + ) -> Result<(), NotSafe> { + let mut conn = self.conn_pool.get()?; + let txn = conn.transaction()?; + public_keys.try_for_each(|pubkey| self.prune_signed_blocks(pubkey, new_min_slot, &txn))?; + txn.commit()?; + Ok(()) + } + + /// Remove all attestations for `public_key` with `target < new_min_target`. + /// + /// Pruning every attestation with target less than `new_min_target` also has the effect of + /// making the new minimum source the source of the attestation with `target == new_min_target` + /// (if any exists). This is exactly what's required for pruning after importing an interchange + /// file, whereby we want to update the new minimum source to the min source from the + /// interchange. + /// + /// If the `new_min_target` was plucked out of thin air and doesn't necessarily correspond to + /// an extant attestation then this function is still safe. It will never delete *all* the + /// attestations in the database. + fn prune_signed_attestations( &self, public_key: &PublicKey, - new_min_source: Epoch, new_min_target: Epoch, txn: &Transaction, ) -> Result<(), NotSafe> { let validator_id = self.get_validator_id_in_txn(txn, public_key)?; - // Delete attestations with source *and* target less than the minimums. - // Assuming `(new_min_source, new_min_target)` was successfully - // inserted into the database, then any other attestation in the database - // can't have just its source or just its target less than the new minimum. - // I.e. the following holds: - // a.source < new_min_source <--> a.target < new_min_target + // The following holds: + // a.target < new_min_target --> a.source <= new_min_source + // + // The `MAX(target_epoch)` acts as a guard to prevent accidentally clearing the DB. txn.execute( "DELETE FROM signed_attestations - WHERE validator_id = ?1 AND source_epoch < ?2 AND target_epoch < ?3", - params![validator_id, new_min_source, new_min_target], + WHERE + validator_id = ?1 AND + target_epoch < ?2 AND + target_epoch < (SELECT MAX(target_epoch) + FROM signed_attestations + WHERE validator_id = ?1)", + params![validator_id, new_min_target], )?; Ok(()) } + /// Prune the signed attestations table for the given validator keys. + pub fn prune_all_signed_attestations<'a>( + &self, + mut public_keys: impl Iterator, + new_min_target: Epoch, + ) -> Result<(), NotSafe> { + let mut conn = self.conn_pool.get()?; + let txn = conn.transaction()?; + public_keys + .try_for_each(|pubkey| self.prune_signed_attestations(pubkey, new_min_target, &txn))?; + txn.commit()?; + Ok(()) + } + pub fn num_validator_rows(&self) -> Result { let mut conn = self.conn_pool.get()?; let txn = conn.transaction()?; diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 97d2496b1..1a1b8ae84 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -222,6 +222,11 @@ impl AttestationService { ); }); + // Schedule pruning of the slashing protection database once all unaggregated + // attestations have (hopefully) been signed, i.e. at the same time as aggregate + // production. + self.spawn_slashing_protection_pruning_task(slot, aggregate_production_instant); + Ok(()) } @@ -566,6 +571,32 @@ impl AttestationService { Ok(()) } + + /// Spawn a blocking task to run the slashing protection pruning process. + /// + /// Start the task at `pruning_instant` to avoid interference with other tasks. + fn spawn_slashing_protection_pruning_task(&self, slot: Slot, pruning_instant: Instant) { + let attestation_service = self.clone(); + let executor = self.inner.context.executor.clone(); + let current_epoch = slot.epoch(E::slots_per_epoch()); + + // Wait for `pruning_instant` in a regular task, and then switch to a blocking one. + self.inner.context.executor.spawn( + async move { + sleep_until(pruning_instant).await; + + executor.spawn_blocking( + move || { + attestation_service + .validator_store + .prune_slashing_protection_db(current_epoch, false) + }, + "slashing_protection_pruning", + ) + }, + "slashing_protection_pre_pruning", + ); + } } #[cfg(test)] diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index d60592f1b..5a6b993ab 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -65,6 +65,10 @@ lazy_static::lazy_static! { "Duration to perform attestation service tasks", &["task"] ); + pub static ref SLASHING_PROTECTION_PRUNE_TIMES: Result = try_create_histogram( + "vc_slashing_protection_prune_times_seconds", + "Time required to prune the slashing protection DB", + ); pub static ref BLOCK_SERVICE_TIMES: Result = try_create_histogram_vec( "vc_beacon_block_service_task_times_seconds", "Duration to perform beacon block service tasks", diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 00f48538a..edc873efe 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -277,6 +277,13 @@ impl ProductionValidatorClient { "voting_validators" => validator_store.num_voting_validators() ); + // Perform pruning of the slashing protection database on start-up. In case the database is + // oversized from having not been pruned (by a prior version) we don't want to prune + // concurrently, as it will hog the lock and cause the attestation service to spew CRITs. + if let Some(slot) = slot_clock.now() { + validator_store.prune_slashing_protection_db(slot.epoch(T::slots_per_epoch()), true); + } + let duties_service = DutiesServiceBuilder::new() .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 63b752e10..f152f411f 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -2,9 +2,9 @@ use crate::{ fork_service::ForkService, http_metrics::metrics, initialized_validators::InitializedValidators, }; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use slashing_protection::{NotSafe, Safe, SlashingDatabase}; -use slog::{crit, error, warn, Logger}; +use slog::{crit, error, info, warn, Logger}; use slot_clock::SlotClock; use std::path::Path; use std::sync::Arc; @@ -15,6 +15,11 @@ use types::{ }; use validator_dir::ValidatorDir; +/// Number of epochs of slashing protection history to keep. +/// +/// This acts as a maximum safe-guard against clock drift. +const SLASHING_PROTECTION_HISTORY_EPOCHS: u64 = 512; + struct LocalValidator { validator_dir: ValidatorDir, voting_keypair: Keypair, @@ -44,6 +49,7 @@ impl PartialEq for LocalValidator { pub struct ValidatorStore { validators: Arc>, slashing_protection: SlashingDatabase, + slashing_protection_last_prune: Arc>, genesis_validators_root: Hash256, spec: Arc, log: Logger, @@ -63,6 +69,7 @@ impl ValidatorStore { Self { validators: Arc::new(RwLock::new(validators)), slashing_protection, + slashing_protection_last_prune: Arc::new(Mutex::new(Epoch::new(0))), genesis_validators_root, spec: Arc::new(spec), log, @@ -359,4 +366,65 @@ impl ValidatorStore { &self.spec, )) } + + /// Prune the slashing protection database so that it remains performant. + /// + /// This function will only do actual pruning periodically, so it should usually be + /// cheap to call. The `first_run` flag can be used to print a more verbose message when pruning + /// runs. + pub fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool) { + // Attempt to prune every SLASHING_PROTECTION_HISTORY_EPOCHs, with a tolerance for + // missing the epoch that aligns exactly. + let mut last_prune = self.slashing_protection_last_prune.lock(); + if current_epoch / SLASHING_PROTECTION_HISTORY_EPOCHS + <= *last_prune / SLASHING_PROTECTION_HISTORY_EPOCHS + { + return; + } + + if first_run { + info!( + self.log, + "Pruning slashing protection DB"; + "epoch" => current_epoch, + "msg" => "pruning may take several minutes the first time it runs" + ); + } else { + info!(self.log, "Pruning slashing protection DB"; "epoch" => current_epoch); + } + + let _timer = metrics::start_timer(&metrics::SLASHING_PROTECTION_PRUNE_TIMES); + + let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS); + let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch()); + + let validators = self.validators.read(); + if let Err(e) = self + .slashing_protection + .prune_all_signed_attestations(validators.iter_voting_pubkeys(), new_min_target_epoch) + { + error!( + self.log, + "Error during pruning of signed attestations"; + "error" => ?e, + ); + return; + } + + if let Err(e) = self + .slashing_protection + .prune_all_signed_blocks(validators.iter_voting_pubkeys(), new_min_slot) + { + error!( + self.log, + "Error during pruning of signed blocks"; + "error" => ?e, + ); + return; + } + + *last_prune = current_epoch; + + info!(self.log, "Completed pruning of slashing protection DB"); + } }