Add commmand for pruning states (#4835)

## Issue Addressed

Closes #4481. 

(Continuation of #4648)

## Proposed Changes

- [x] Add `lighthouse db prune-states`
- [x] Make it work
- [x] Ensure block roots are handled correctly (to be addressed in 4735)
- [x] Check perf on mainnet/Goerli/Gnosis (takes a few seconds max)
- [x] Run block root healing logic (#4875 ) at the beginning
- [x] Add some tests
- [x] Update docs
- [x] Add `--freezer` flag and other improvements to `lighthouse db inspect`

Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Jimmy Chen <jimmy@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
This commit is contained in:
Jimmy Chen 2023-11-03 00:12:19 +00:00
parent 07f53b18fc
commit 36d8849813
8 changed files with 373 additions and 29 deletions

View File

@ -27,7 +27,7 @@ use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN};
use store::{
chunked_vector::{chunk_key, Field},
get_key_for_col,
@ -3306,6 +3306,77 @@ fn check_blob_existence(
}
}
#[tokio::test]
async fn prune_historic_states() {
let num_blocks_produced = E::slots_per_epoch() * 5;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let genesis_state_root = harness.chain.genesis_state_root;
let genesis_state = harness
.chain
.get_state(&genesis_state_root, None)
.unwrap()
.unwrap();
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Check historical state is present.
let state_roots_iter = harness
.chain
.forwards_iter_state_roots(Slot::new(0))
.unwrap();
for (state_root, slot) in state_roots_iter
.take(E::slots_per_epoch() as usize)
.map(Result::unwrap)
{
assert!(store.get_state(&state_root, Some(slot)).unwrap().is_some());
}
store
.prune_historic_states(genesis_state_root, &genesis_state)
.unwrap();
// Check that anchor info is updated.
let anchor_info = store.get_anchor_info().unwrap();
assert_eq!(anchor_info.state_lower_limit, 0);
assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN);
// Historical states should be pruned.
let state_roots_iter = harness
.chain
.forwards_iter_state_roots(Slot::new(1))
.unwrap();
for (state_root, slot) in state_roots_iter
.take(E::slots_per_epoch() as usize)
.map(Result::unwrap)
{
assert!(store.get_state(&state_root, Some(slot)).unwrap().is_none());
}
// Ensure that genesis state is still accessible
let genesis_state_root = harness.chain.genesis_state_root;
assert!(store
.get_state(&genesis_state_root, Some(Slot::new(0)))
.unwrap()
.is_some());
// Run for another two epochs.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;
check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
}
/// Checks that two chains are the same, for the purpose of these tests.
///
/// Several fields that are hard/impossible to check are ignored (e.g., the store).

View File

@ -2222,6 +2222,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}
/// This function fills in missing block roots between last restore point slot and split
/// slot, if any.
pub fn heal_freezer_block_roots(&self) -> Result<(), Error> {
let split = self.get_split_info();
let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point
@ -2250,6 +2252,93 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}
/// Delete *all* states from the freezer database and update the anchor accordingly.
///
/// WARNING: this method deletes the genesis state and replaces it with the provided
/// `genesis_state`. This is to support its use in schema migrations where the storage scheme of
/// the genesis state may be modified. It is the responsibility of the caller to ensure that the
/// genesis state is correct, else a corrupt database will be created.
pub fn prune_historic_states(
&self,
genesis_state_root: Hash256,
genesis_state: &BeaconState<E>,
) -> Result<(), Error> {
// Make sure there is no missing block roots before pruning
self.heal_freezer_block_roots()?;
// Update the anchor to use the dummy state upper limit and disable historic state storage.
let old_anchor = self.get_anchor_info();
let new_anchor = if let Some(old_anchor) = old_anchor.clone() {
AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..old_anchor.clone()
}
} else {
AnchorInfo {
anchor_slot: Slot::new(0),
oldest_block_slot: Slot::new(0),
oldest_block_parent: Hash256::zero(),
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
}
};
// Commit the anchor change immediately: if the cold database ops fail they can always be
// retried, and we can't do them atomically with this change anyway.
self.compare_and_set_anchor_info_with_write(old_anchor, Some(new_anchor))?;
// Stage freezer data for deletion. Do not bother loading and deserializing values as this
// wastes time and is less schema-agnostic. My hope is that this method will be useful for
// migrating to the tree-states schema (delete everything in the freezer then start afresh).
let mut cold_ops = vec![];
let columns = [
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
DBColumn::BeaconRestorePoint,
DBColumn::BeaconStateRoots,
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
];
for column in columns {
for res in self.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
column.as_str(),
&key,
)));
}
}
// XXX: We need to commit the mass deletion here *before* re-storing the genesis state, as
// the current schema performs reads as part of `store_cold_state`. This can be deleted
// once the target schema is tree-states. If the process is killed before the genesis state
// is written this can be fixed by re-running.
info!(
self.log,
"Deleting historic states";
"num_kv" => cold_ops.len(),
);
self.cold_db.do_atomically(std::mem::take(&mut cold_ops))?;
// If we just deleted the the genesis state, re-store it using the *current* schema, which
// may be different from the schema of the genesis state we just deleted.
if self.get_split_slot() > 0 {
info!(
self.log,
"Re-storing genesis state";
"state_root" => ?genesis_state_root,
);
self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?;
self.cold_db.do_atomically(cold_ops)?;
}
Ok(())
}
}
/// Advance the split point of the store, moving new finalized states to the freezer.

View File

@ -169,6 +169,7 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
for (start_key, end_key) in [
endpoints(DBColumn::BeaconStateTemporary),
endpoints(DBColumn::BeaconState),
endpoints(DBColumn::BeaconStateSummary),
] {
self.db.compact(&start_key, &end_key);
}
@ -225,9 +226,9 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
}
/// Iterate through all keys and values in a particular column.
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()]));
let iter = self.db.keys_iter(self.read_options());
iter.seek(&start_key);
@ -235,13 +236,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
Box::new(
iter.take_while(move |key| key.matches_column(column))
.map(move |bytes_key| {
let key =
bytes_key
.remove_column(column)
.ok_or(HotColdDBError::IterationError {
unexpected_key: bytes_key,
let key = bytes_key.remove_column_variable(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key.clone(),
}
})?;
Ok(key)
K::from_bytes(key)
}),
)
}

View File

@ -45,7 +45,7 @@ use strum::{EnumString, IntoStaticStr};
pub use types::*;
pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
pub type ColumnKeyIter<'a, K> = Box<dyn Iterator<Item = Result<K, Error>> + 'a>;
pub type RawEntryIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>> + 'a>;
pub type RawKeyIter<'a> = Box<dyn Iterator<Item = Result<Vec<u8>, Error>> + 'a>;
@ -88,6 +88,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
self.iter_column_from(column, &vec![0; column.key_size()])
}
/// Iterate through all keys and values in a column from a given starting point.
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K>;
fn iter_raw_entries(&self, _column: DBColumn, _prefix: &[u8]) -> RawEntryIter {
@ -99,7 +100,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
}
/// Iterate through all keys in a particular column.
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter;
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K>;
}
pub trait Key: Sized + 'static {
@ -274,7 +275,7 @@ impl DBColumn {
/// This function returns the number of bytes used by keys in a given column.
pub fn key_size(self) -> usize {
match self {
Self::OverflowLRUCache => 40,
Self::OverflowLRUCache => 33, // See `OverflowKey` encode impl.
Self::BeaconMeta
| Self::BeaconBlock
| Self::BeaconState

View File

@ -100,7 +100,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
}))
}
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k)))
}

View File

@ -158,3 +158,38 @@ lighthouse db version --network mainnet
```
[run-correctly]: #how-to-run-lighthouse-db-correctly
## How to prune historic states
Pruning historic states helps in managing the disk space used by the Lighthouse beacon node by removing old beacon
states from the freezer database. This can be especially useful when the database has accumulated a significant amount
of historic data. This command is intended for nodes synced before 4.4.1, as newly synced node no longer store
historic states by default.
Here are the steps to prune historic states:
1. Before running the prune command, make sure that the Lighthouse beacon node is not running. If you are using systemd, you might stop the Lighthouse beacon node with a command like:
```bash
sudo systemctl stop lighthousebeacon
```
2. Use the `prune-states` command to prune the historic states. You can do a test run without the `--confirm` flag to check that the database can be pruned:
```bash
sudo -u "$LH_USER" lighthouse db prune-states --datadir "$LH_DATADIR" --network "$NET"
```
3. If you are ready to prune the states irreversibly, add the `--confirm` flag to commit the changes:
```bash
sudo -u "$LH_USER" lighthouse db prune-states --confirm --datadir "$LH_DATADIR" --network "$NET"
```
The `--confirm` flag ensures that you are aware the action is irreversible, and historic states will be permanently removed.
4. After successfully pruning the historic states, you can restart the Lighthouse beacon node:
```bash
sudo systemctl start lighthousebeacon
```

View File

@ -92,8 +92,7 @@ impl ForkChoiceTest {
T: Fn(&BeaconForkChoiceStore<E, MemoryStore<E>, MemoryStore<E>>) -> U,
{
func(
&self
.harness
self.harness
.chain
.canonical_head
.fork_choice_read_lock()
@ -386,8 +385,7 @@ impl ForkChoiceTest {
&self.harness.chain.spec,
self.harness.logger(),
)
.err()
.expect("on_block did not return an error");
.expect_err("on_block did not return an error");
comparison_func(err);
self
}
@ -841,7 +839,7 @@ async fn valid_attestation() {
.apply_attestation_to_chain(
MutationDelay::NoDelay,
|_, _| {},
|result| assert_eq!(result.unwrap(), ()),
|result| assert!(result.is_ok()),
)
.await;
}
@ -1074,7 +1072,7 @@ async fn invalid_attestation_delayed_slot() {
.apply_attestation_to_chain(
MutationDelay::NoDelay,
|_, _| {},
|result| assert_eq!(result.unwrap(), ()),
|result| assert!(result.is_ok()),
)
.await
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 1))
@ -1183,7 +1181,7 @@ async fn weak_subjectivity_check_fails_early_epoch() {
let mut checkpoint = setup_harness.harness.finalized_checkpoint();
checkpoint.epoch = checkpoint.epoch - 1;
checkpoint.epoch -= 1;
let chain_config = ChainConfig {
weak_subjectivity_checkpoint: Some(checkpoint),
@ -1210,7 +1208,7 @@ async fn weak_subjectivity_check_fails_late_epoch() {
let mut checkpoint = setup_harness.harness.finalized_checkpoint();
checkpoint.epoch = checkpoint.epoch + 1;
checkpoint.epoch += 1;
let chain_config = ChainConfig {
weak_subjectivity_checkpoint: Some(checkpoint),

View File

@ -5,17 +5,18 @@ use beacon_chain::{
use beacon_node::{get_data_dir, get_slots_per_restore_point, ClientConfig};
use clap::{App, Arg, ArgMatches};
use environment::{Environment, RuntimeContext};
use slog::{info, Logger};
use slog::{info, warn, Logger};
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN;
use store::{
errors::Error,
metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION},
DBColumn, HotColdDB, KeyValueStore, LevelDB,
};
use strum::{EnumString, EnumVariantNames, VariantNames};
use types::EthSpec;
use types::{BeaconState, EthSpec, Slot};
pub const CMD: &str = "database_manager";
@ -88,17 +89,35 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
}
pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> {
App::new("prune_payloads")
App::new("prune-payloads")
.alias("prune_payloads")
.setting(clap::AppSettings::ColoredHelp)
.about("Prune finalized execution payloads")
}
pub fn prune_blobs_app<'a, 'b>() -> App<'a, 'b> {
App::new("prune_blobs")
App::new("prune-blobs")
.alias("prune_blobs")
.setting(clap::AppSettings::ColoredHelp)
.about("Prune blobs older than data availability boundary")
}
pub fn prune_states_app<'a, 'b>() -> App<'a, 'b> {
App::new("prune-states")
.alias("prune_states")
.arg(
Arg::with_name("confirm")
.long("confirm")
.help(
"Commit to pruning states irreversably. Without this flag the command will \
just check that the database is capable of being pruned.",
)
.takes_value(false),
)
.setting(clap::AppSettings::ColoredHelp)
.about("Prune all beacon states from the freezer database")
}
pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
App::new(CMD)
.visible_aliases(&["db"])
@ -145,6 +164,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.subcommand(inspect_cli_app())
.subcommand(prune_payloads_app())
.subcommand(prune_blobs_app())
.subcommand(prune_states_app())
}
fn parse_client_config<E: EthSpec>(
@ -213,7 +233,7 @@ pub fn display_db_version<E: EthSpec>(
Ok(())
}
#[derive(Debug, EnumString, EnumVariantNames)]
#[derive(Debug, PartialEq, Eq, EnumString, EnumVariantNames)]
pub enum InspectTarget {
#[strum(serialize = "sizes")]
ValueSizes,
@ -221,6 +241,8 @@ pub enum InspectTarget {
ValueTotal,
#[strum(serialize = "values")]
Values,
#[strum(serialize = "gaps")]
Gaps,
}
pub struct InspectConfig {
@ -286,6 +308,9 @@ pub fn inspect_db<E: EthSpec>(
let skip = inspect_config.skip.unwrap_or(0);
let limit = inspect_config.limit.unwrap_or(usize::MAX);
let mut prev_key = 0;
let mut found_gaps = false;
let base_path = &inspect_config.output_dir;
if let InspectTarget::Values = inspect_config.target {
@ -304,6 +329,23 @@ pub fn inspect_db<E: EthSpec>(
InspectTarget::ValueSizes => {
println!("{}: {} bytes", hex::encode(&key), value.len());
}
InspectTarget::Gaps => {
// Convert last 8 bytes of key to u64.
let numeric_key = u64::from_be_bytes(
key[key.len() - 8..]
.try_into()
.expect("key is at least 8 bytes"),
);
if numeric_key > prev_key + 1 {
println!(
"gap between keys {} and {} (offset: {})",
prev_key, numeric_key, num_keys,
);
found_gaps = true;
}
prev_key = numeric_key;
}
InspectTarget::ValueTotal => (),
InspectTarget::Values => {
let file_path = base_path.join(format!(
@ -332,6 +374,10 @@ pub fn inspect_db<E: EthSpec>(
num_keys += 1;
}
if inspect_config.target == InspectTarget::Gaps && !found_gaps {
println!("No gaps found!");
}
println!("Num keys: {}", num_keys);
println!("Total: {} bytes", total);
@ -442,6 +488,86 @@ pub fn prune_blobs<E: EthSpec>(
db.try_prune_most_blobs(true)
}
pub struct PruneStatesConfig {
confirm: bool,
}
fn parse_prune_states_config(cli_args: &ArgMatches) -> Result<PruneStatesConfig, String> {
let confirm = cli_args.is_present("confirm");
Ok(PruneStatesConfig { confirm })
}
pub fn prune_states<E: EthSpec>(
client_config: ClientConfig,
prune_config: PruneStatesConfig,
mut genesis_state: BeaconState<E>,
runtime_context: &RuntimeContext<E>,
log: Logger,
) -> Result<(), String> {
let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path,
&cold_path,
blobs_path,
|_, _, _| Ok(()),
client_config.store,
spec.clone(),
log.clone(),
)
.map_err(|e| format!("Unable to open database: {e:?}"))?;
// Load the genesis state from the database to ensure we're deleting states for the
// correct network, and that we don't end up storing the wrong genesis state.
let genesis_from_db = db
.load_cold_state_by_slot(Slot::new(0))
.map_err(|e| format!("Error reading genesis state: {e:?}"))?
.ok_or("Error: genesis state missing from database. Check schema version.")?;
if genesis_from_db.genesis_validators_root() != genesis_state.genesis_validators_root() {
return Err(format!(
"Error: Wrong network. Genesis state in DB does not match {} genesis.",
spec.config_name.as_deref().unwrap_or("<unknown network>")
));
}
// Check that the user has confirmed they want to proceed.
if !prune_config.confirm {
match db.get_anchor_info() {
Some(anchor_info) if anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => {
info!(log, "States have already been pruned");
return Ok(());
}
_ => {
info!(log, "Ready to prune states");
}
}
warn!(
log,
"Pruning states is irreversible";
);
warn!(
log,
"Re-run this command with --confirm to commit to state deletion"
);
info!(log, "Nothing has been pruned on this run");
return Err("Error: confirmation flag required".into());
}
// Delete all historic state data and *re-store* the genesis state.
let genesis_state_root = genesis_state
.update_tree_hash_cache()
.map_err(|e| format!("Error computing genesis state root: {e:?}"))?;
db.prune_historic_states(genesis_state_root, &genesis_state)
.map_err(|e| format!("Failed to prune due to error: {e:?}"))?;
info!(log, "Historic states pruned successfully");
Ok(())
}
/// Run the database manager, returning an error string if the operation did not succeed.
pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, env: Environment<T>) -> Result<(), String> {
let client_config = parse_client_config(cli_args, &env)?;
@ -461,10 +587,34 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, env: Environment<T>) -> Result
let inspect_config = parse_inspect_config(cli_args)?;
inspect_db(inspect_config, client_config, &context, log)
}
("prune_payloads", Some(_)) => {
("prune-payloads", Some(_)) => {
prune_payloads(client_config, &context, log).map_err(format_err)
}
("prune_blobs", Some(_)) => prune_blobs(client_config, &context, log).map_err(format_err),
("prune-blobs", Some(_)) => prune_blobs(client_config, &context, log).map_err(format_err),
("prune-states", Some(cli_args)) => {
let executor = env.core_context().executor;
let network_config = context
.eth2_network_config
.clone()
.ok_or("Missing network config")?;
let genesis_state = executor
.block_on_dangerous(
network_config.genesis_state::<T>(
client_config.genesis_state_url.as_deref(),
client_config.genesis_state_url_timeout,
&log,
),
"get_genesis_state",
)
.ok_or("Shutting down")?
.map_err(|e| format!("Error getting genesis state: {e}"))?
.ok_or("Genesis state missing")?;
let prune_config = parse_prune_states_config(cli_args)?;
prune_states(client_config, prune_config, genesis_state, &context, log)
}
_ => Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()),
}
}