diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index e8bb76778..59a4c1dec 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -27,7 +27,7 @@ use std::collections::HashSet; use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; -use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}; +use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN}; use store::{ chunked_vector::{chunk_key, Field}, get_key_for_col, @@ -3306,6 +3306,77 @@ fn check_blob_existence( } } +#[tokio::test] +async fn prune_historic_states() { + let num_blocks_produced = E::slots_per_epoch() * 5; + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let genesis_state_root = harness.chain.genesis_state_root; + let genesis_state = harness + .chain + .get_state(&genesis_state_root, None) + .unwrap() + .unwrap(); + + harness + .extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Check historical state is present. + let state_roots_iter = harness + .chain + .forwards_iter_state_roots(Slot::new(0)) + .unwrap(); + for (state_root, slot) in state_roots_iter + .take(E::slots_per_epoch() as usize) + .map(Result::unwrap) + { + assert!(store.get_state(&state_root, Some(slot)).unwrap().is_some()); + } + + store + .prune_historic_states(genesis_state_root, &genesis_state) + .unwrap(); + + // Check that anchor info is updated. + let anchor_info = store.get_anchor_info().unwrap(); + assert_eq!(anchor_info.state_lower_limit, 0); + assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN); + + // Historical states should be pruned. + let state_roots_iter = harness + .chain + .forwards_iter_state_roots(Slot::new(1)) + .unwrap(); + for (state_root, slot) in state_roots_iter + .take(E::slots_per_epoch() as usize) + .map(Result::unwrap) + { + assert!(store.get_state(&state_root, Some(slot)).unwrap().is_none()); + } + + // Ensure that genesis state is still accessible + let genesis_state_root = harness.chain.genesis_state_root; + assert!(store + .get_state(&genesis_state_root, Some(Slot::new(0))) + .unwrap() + .is_some()); + + // Run for another two epochs. + let additional_blocks_produced = 2 * E::slots_per_epoch(); + harness + .extend_slots(additional_blocks_produced as usize) + .await; + + check_finalization(&harness, num_blocks_produced + additional_blocks_produced); + check_split_slot(&harness, store); +} + /// Checks that two chains are the same, for the purpose of these tests. /// /// Several fields that are hard/impossible to check are ignored (e.g., the store). diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 640647db3..6e2a2ae58 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2222,6 +2222,8 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + /// This function fills in missing block roots between last restore point slot and split + /// slot, if any. pub fn heal_freezer_block_roots(&self) -> Result<(), Error> { let split = self.get_split_info(); let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point @@ -2250,6 +2252,93 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + + /// Delete *all* states from the freezer database and update the anchor accordingly. + /// + /// WARNING: this method deletes the genesis state and replaces it with the provided + /// `genesis_state`. This is to support its use in schema migrations where the storage scheme of + /// the genesis state may be modified. It is the responsibility of the caller to ensure that the + /// genesis state is correct, else a corrupt database will be created. + pub fn prune_historic_states( + &self, + genesis_state_root: Hash256, + genesis_state: &BeaconState, + ) -> Result<(), Error> { + // Make sure there is no missing block roots before pruning + self.heal_freezer_block_roots()?; + + // Update the anchor to use the dummy state upper limit and disable historic state storage. + let old_anchor = self.get_anchor_info(); + let new_anchor = if let Some(old_anchor) = old_anchor.clone() { + AnchorInfo { + state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, + state_lower_limit: Slot::new(0), + ..old_anchor.clone() + } + } else { + AnchorInfo { + anchor_slot: Slot::new(0), + oldest_block_slot: Slot::new(0), + oldest_block_parent: Hash256::zero(), + state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, + state_lower_limit: Slot::new(0), + } + }; + + // Commit the anchor change immediately: if the cold database ops fail they can always be + // retried, and we can't do them atomically with this change anyway. + self.compare_and_set_anchor_info_with_write(old_anchor, Some(new_anchor))?; + + // Stage freezer data for deletion. Do not bother loading and deserializing values as this + // wastes time and is less schema-agnostic. My hope is that this method will be useful for + // migrating to the tree-states schema (delete everything in the freezer then start afresh). + let mut cold_ops = vec![]; + + let columns = [ + DBColumn::BeaconState, + DBColumn::BeaconStateSummary, + DBColumn::BeaconRestorePoint, + DBColumn::BeaconStateRoots, + DBColumn::BeaconHistoricalRoots, + DBColumn::BeaconRandaoMixes, + DBColumn::BeaconHistoricalSummaries, + ]; + + for column in columns { + for res in self.cold_db.iter_column_keys::>(column) { + let key = res?; + cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col( + column.as_str(), + &key, + ))); + } + } + + // XXX: We need to commit the mass deletion here *before* re-storing the genesis state, as + // the current schema performs reads as part of `store_cold_state`. This can be deleted + // once the target schema is tree-states. If the process is killed before the genesis state + // is written this can be fixed by re-running. + info!( + self.log, + "Deleting historic states"; + "num_kv" => cold_ops.len(), + ); + self.cold_db.do_atomically(std::mem::take(&mut cold_ops))?; + + // If we just deleted the the genesis state, re-store it using the *current* schema, which + // may be different from the schema of the genesis state we just deleted. + if self.get_split_slot() > 0 { + info!( + self.log, + "Re-storing genesis state"; + "state_root" => ?genesis_state_root, + ); + self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?; + self.cold_db.do_atomically(cold_ops)?; + } + + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer. diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 0e4cd0e3b..62619dd2c 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -169,6 +169,7 @@ impl KeyValueStore for LevelDB { for (start_key, end_key) in [ endpoints(DBColumn::BeaconStateTemporary), endpoints(DBColumn::BeaconState), + endpoints(DBColumn::BeaconStateSummary), ] { self.db.compact(&start_key, &end_key); } @@ -225,9 +226,9 @@ impl KeyValueStore for LevelDB { } /// Iterate through all keys and values in a particular column. - fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { let start_key = - BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()])); let iter = self.db.keys_iter(self.read_options()); iter.seek(&start_key); @@ -235,13 +236,12 @@ impl KeyValueStore for LevelDB { Box::new( iter.take_while(move |key| key.matches_column(column)) .map(move |bytes_key| { - let key = - bytes_key - .remove_column(column) - .ok_or(HotColdDBError::IterationError { - unexpected_key: bytes_key, - })?; - Ok(key) + let key = bytes_key.remove_column_variable(column).ok_or_else(|| { + HotColdDBError::IterationError { + unexpected_key: bytes_key.clone(), + } + })?; + K::from_bytes(key) }), ) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 0f54b4664..eacd28d2d 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -45,7 +45,7 @@ use strum::{EnumString, IntoStaticStr}; pub use types::*; pub type ColumnIter<'a, K> = Box), Error>> + 'a>; -pub type ColumnKeyIter<'a> = Box> + 'a>; +pub type ColumnKeyIter<'a, K> = Box> + 'a>; pub type RawEntryIter<'a> = Box, Vec), Error>> + 'a>; pub type RawKeyIter<'a> = Box, Error>> + 'a>; @@ -88,6 +88,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { self.iter_column_from(column, &vec![0; column.key_size()]) } + /// Iterate through all keys and values in a column from a given starting point. fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter; fn iter_raw_entries(&self, _column: DBColumn, _prefix: &[u8]) -> RawEntryIter { @@ -99,7 +100,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { } /// Iterate through all keys in a particular column. - fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; } pub trait Key: Sized + 'static { @@ -274,7 +275,7 @@ impl DBColumn { /// This function returns the number of bytes used by keys in a given column. pub fn key_size(self) -> usize { match self { - Self::OverflowLRUCache => 40, + Self::OverflowLRUCache => 33, // See `OverflowKey` encode impl. Self::BeaconMeta | Self::BeaconBlock | Self::BeaconState diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 7a276a1fa..c2e494dce 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -100,7 +100,7 @@ impl KeyValueStore for MemoryStore { })) } - fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) } diff --git a/book/src/database-migrations.md b/book/src/database-migrations.md index 9b60ca2e1..ed7f7d72f 100644 --- a/book/src/database-migrations.md +++ b/book/src/database-migrations.md @@ -158,3 +158,38 @@ lighthouse db version --network mainnet ``` [run-correctly]: #how-to-run-lighthouse-db-correctly + +## How to prune historic states + +Pruning historic states helps in managing the disk space used by the Lighthouse beacon node by removing old beacon +states from the freezer database. This can be especially useful when the database has accumulated a significant amount +of historic data. This command is intended for nodes synced before 4.4.1, as newly synced node no longer store +historic states by default. + +Here are the steps to prune historic states: + +1. Before running the prune command, make sure that the Lighthouse beacon node is not running. If you are using systemd, you might stop the Lighthouse beacon node with a command like: + + ```bash + sudo systemctl stop lighthousebeacon + ``` + +2. Use the `prune-states` command to prune the historic states. You can do a test run without the `--confirm` flag to check that the database can be pruned: + + ```bash + sudo -u "$LH_USER" lighthouse db prune-states --datadir "$LH_DATADIR" --network "$NET" + ``` + +3. If you are ready to prune the states irreversibly, add the `--confirm` flag to commit the changes: + + ```bash + sudo -u "$LH_USER" lighthouse db prune-states --confirm --datadir "$LH_DATADIR" --network "$NET" + ``` + + The `--confirm` flag ensures that you are aware the action is irreversible, and historic states will be permanently removed. + +4. After successfully pruning the historic states, you can restart the Lighthouse beacon node: + + ```bash + sudo systemctl start lighthousebeacon + ``` diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 9d39eb3e3..c21b8ed56 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -92,8 +92,7 @@ impl ForkChoiceTest { T: Fn(&BeaconForkChoiceStore, MemoryStore>) -> U, { func( - &self - .harness + self.harness .chain .canonical_head .fork_choice_read_lock() @@ -386,8 +385,7 @@ impl ForkChoiceTest { &self.harness.chain.spec, self.harness.logger(), ) - .err() - .expect("on_block did not return an error"); + .expect_err("on_block did not return an error"); comparison_func(err); self } @@ -841,7 +839,7 @@ async fn valid_attestation() { .apply_attestation_to_chain( MutationDelay::NoDelay, |_, _| {}, - |result| assert_eq!(result.unwrap(), ()), + |result| assert!(result.is_ok()), ) .await; } @@ -1074,7 +1072,7 @@ async fn invalid_attestation_delayed_slot() { .apply_attestation_to_chain( MutationDelay::NoDelay, |_, _| {}, - |result| assert_eq!(result.unwrap(), ()), + |result| assert!(result.is_ok()), ) .await .inspect_queued_attestations(|queue| assert_eq!(queue.len(), 1)) @@ -1183,7 +1181,7 @@ async fn weak_subjectivity_check_fails_early_epoch() { let mut checkpoint = setup_harness.harness.finalized_checkpoint(); - checkpoint.epoch = checkpoint.epoch - 1; + checkpoint.epoch -= 1; let chain_config = ChainConfig { weak_subjectivity_checkpoint: Some(checkpoint), @@ -1210,7 +1208,7 @@ async fn weak_subjectivity_check_fails_late_epoch() { let mut checkpoint = setup_harness.harness.finalized_checkpoint(); - checkpoint.epoch = checkpoint.epoch + 1; + checkpoint.epoch += 1; let chain_config = ChainConfig { weak_subjectivity_checkpoint: Some(checkpoint), diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 079419aba..93654b8dd 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -5,17 +5,18 @@ use beacon_chain::{ use beacon_node::{get_data_dir, get_slots_per_restore_point, ClientConfig}; use clap::{App, Arg, ArgMatches}; use environment::{Environment, RuntimeContext}; -use slog::{info, Logger}; +use slog::{info, warn, Logger}; use std::fs; use std::io::Write; use std::path::PathBuf; +use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN; use store::{ errors::Error, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, DBColumn, HotColdDB, KeyValueStore, LevelDB, }; use strum::{EnumString, EnumVariantNames, VariantNames}; -use types::EthSpec; +use types::{BeaconState, EthSpec, Slot}; pub const CMD: &str = "database_manager"; @@ -88,17 +89,35 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { } pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { - App::new("prune_payloads") + App::new("prune-payloads") + .alias("prune_payloads") .setting(clap::AppSettings::ColoredHelp) .about("Prune finalized execution payloads") } pub fn prune_blobs_app<'a, 'b>() -> App<'a, 'b> { - App::new("prune_blobs") + App::new("prune-blobs") + .alias("prune_blobs") .setting(clap::AppSettings::ColoredHelp) .about("Prune blobs older than data availability boundary") } +pub fn prune_states_app<'a, 'b>() -> App<'a, 'b> { + App::new("prune-states") + .alias("prune_states") + .arg( + Arg::with_name("confirm") + .long("confirm") + .help( + "Commit to pruning states irreversably. Without this flag the command will \ + just check that the database is capable of being pruned.", + ) + .takes_value(false), + ) + .setting(clap::AppSettings::ColoredHelp) + .about("Prune all beacon states from the freezer database") +} + pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new(CMD) .visible_aliases(&["db"]) @@ -145,6 +164,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .subcommand(inspect_cli_app()) .subcommand(prune_payloads_app()) .subcommand(prune_blobs_app()) + .subcommand(prune_states_app()) } fn parse_client_config( @@ -213,7 +233,7 @@ pub fn display_db_version( Ok(()) } -#[derive(Debug, EnumString, EnumVariantNames)] +#[derive(Debug, PartialEq, Eq, EnumString, EnumVariantNames)] pub enum InspectTarget { #[strum(serialize = "sizes")] ValueSizes, @@ -221,6 +241,8 @@ pub enum InspectTarget { ValueTotal, #[strum(serialize = "values")] Values, + #[strum(serialize = "gaps")] + Gaps, } pub struct InspectConfig { @@ -286,6 +308,9 @@ pub fn inspect_db( let skip = inspect_config.skip.unwrap_or(0); let limit = inspect_config.limit.unwrap_or(usize::MAX); + let mut prev_key = 0; + let mut found_gaps = false; + let base_path = &inspect_config.output_dir; if let InspectTarget::Values = inspect_config.target { @@ -304,6 +329,23 @@ pub fn inspect_db( InspectTarget::ValueSizes => { println!("{}: {} bytes", hex::encode(&key), value.len()); } + InspectTarget::Gaps => { + // Convert last 8 bytes of key to u64. + let numeric_key = u64::from_be_bytes( + key[key.len() - 8..] + .try_into() + .expect("key is at least 8 bytes"), + ); + + if numeric_key > prev_key + 1 { + println!( + "gap between keys {} and {} (offset: {})", + prev_key, numeric_key, num_keys, + ); + found_gaps = true; + } + prev_key = numeric_key; + } InspectTarget::ValueTotal => (), InspectTarget::Values => { let file_path = base_path.join(format!( @@ -332,6 +374,10 @@ pub fn inspect_db( num_keys += 1; } + if inspect_config.target == InspectTarget::Gaps && !found_gaps { + println!("No gaps found!"); + } + println!("Num keys: {}", num_keys); println!("Total: {} bytes", total); @@ -442,6 +488,86 @@ pub fn prune_blobs( db.try_prune_most_blobs(true) } +pub struct PruneStatesConfig { + confirm: bool, +} + +fn parse_prune_states_config(cli_args: &ArgMatches) -> Result { + let confirm = cli_args.is_present("confirm"); + Ok(PruneStatesConfig { confirm }) +} + +pub fn prune_states( + client_config: ClientConfig, + prune_config: PruneStatesConfig, + mut genesis_state: BeaconState, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), String> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + blobs_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log.clone(), + ) + .map_err(|e| format!("Unable to open database: {e:?}"))?; + + // Load the genesis state from the database to ensure we're deleting states for the + // correct network, and that we don't end up storing the wrong genesis state. + let genesis_from_db = db + .load_cold_state_by_slot(Slot::new(0)) + .map_err(|e| format!("Error reading genesis state: {e:?}"))? + .ok_or("Error: genesis state missing from database. Check schema version.")?; + + if genesis_from_db.genesis_validators_root() != genesis_state.genesis_validators_root() { + return Err(format!( + "Error: Wrong network. Genesis state in DB does not match {} genesis.", + spec.config_name.as_deref().unwrap_or("") + )); + } + + // Check that the user has confirmed they want to proceed. + if !prune_config.confirm { + match db.get_anchor_info() { + Some(anchor_info) if anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => { + info!(log, "States have already been pruned"); + return Ok(()); + } + _ => { + info!(log, "Ready to prune states"); + } + } + warn!( + log, + "Pruning states is irreversible"; + ); + warn!( + log, + "Re-run this command with --confirm to commit to state deletion" + ); + info!(log, "Nothing has been pruned on this run"); + return Err("Error: confirmation flag required".into()); + } + + // Delete all historic state data and *re-store* the genesis state. + let genesis_state_root = genesis_state + .update_tree_hash_cache() + .map_err(|e| format!("Error computing genesis state root: {e:?}"))?; + db.prune_historic_states(genesis_state_root, &genesis_state) + .map_err(|e| format!("Failed to prune due to error: {e:?}"))?; + + info!(log, "Historic states pruned successfully"); + Ok(()) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result<(), String> { let client_config = parse_client_config(cli_args, &env)?; @@ -461,10 +587,34 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result let inspect_config = parse_inspect_config(cli_args)?; inspect_db(inspect_config, client_config, &context, log) } - ("prune_payloads", Some(_)) => { + ("prune-payloads", Some(_)) => { prune_payloads(client_config, &context, log).map_err(format_err) } - ("prune_blobs", Some(_)) => prune_blobs(client_config, &context, log).map_err(format_err), + ("prune-blobs", Some(_)) => prune_blobs(client_config, &context, log).map_err(format_err), + ("prune-states", Some(cli_args)) => { + let executor = env.core_context().executor; + let network_config = context + .eth2_network_config + .clone() + .ok_or("Missing network config")?; + + let genesis_state = executor + .block_on_dangerous( + network_config.genesis_state::( + client_config.genesis_state_url.as_deref(), + client_config.genesis_state_url_timeout, + &log, + ), + "get_genesis_state", + ) + .ok_or("Shutting down")? + .map_err(|e| format!("Error getting genesis state: {e}"))? + .ok_or("Genesis state missing")?; + + let prune_config = parse_prune_states_config(cli_args)?; + + prune_states(client_config, prune_config, genesis_state, &context, log) + } _ => Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()), } }