diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a578629b6..45375ec01 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -266,6 +266,13 @@ where self.genesis_time = Some(genesis_state.genesis_time()); + // Prune finalized execution payloads. + if store.get_config().prune_payloads_on_init { + store + .try_prune_execution_payloads(false) + .map_err(|e| format!("Error pruning execution payloads: {e:?}"))?; + } + self.op_pool = Some( store .get_item::>(&OP_POOL_DB_KEY) diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index c2283321c..17c84bd69 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -41,28 +41,27 @@ async fn get_chain_segment() -> Vec> { ) .await; - harness + let mut segment = Vec::with_capacity(CHAIN_SEGMENT_LENGTH); + for snapshot in harness .chain .chain_dump() .expect("should dump chain") .into_iter() - .map(|snapshot| { - let full_block = harness - .chain - .store - .make_full_block( - &snapshot.beacon_block_root, - snapshot.beacon_block.as_ref().clone(), - ) - .unwrap(); - BeaconSnapshot { - beacon_block_root: snapshot.beacon_block_root, - beacon_block: Arc::new(full_block), - beacon_state: snapshot.beacon_state, - } - }) .skip(1) - .collect() + { + let full_block = harness + .chain + .get_block(&snapshot.beacon_block_root) + .await + .unwrap() + .unwrap(); + segment.push(BeaconSnapshot { + beacon_block_root: snapshot.beacon_block_root, + beacon_block: Arc::new(full_block), + beacon_state: snapshot.beacon_state, + }); + } + segment } fn get_harness(validator_count: usize) -> BeaconChainHarness> { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index afd97750a..b85ff50ef 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2114,14 +2114,16 @@ async fn weak_subjectivity_sync() { assert_eq!(new_blocks[0].beacon_block.slot(), wss_slot + 1); for snapshot in new_blocks { - let block = &snapshot.beacon_block; let full_block = harness .chain - .store - .make_full_block(&snapshot.beacon_block_root, block.as_ref().clone()) + .get_block(&snapshot.beacon_block_root) + .await + .unwrap() .unwrap(); + let slot = full_block.slot(); + let state_root = full_block.state_root(); - beacon_chain.slot_clock.set_slot(block.slot().as_u64()); + beacon_chain.slot_clock.set_slot(slot.as_u64()); beacon_chain .process_block(Arc::new(full_block), CountUnrealized::True) .await @@ -2129,10 +2131,9 @@ async fn weak_subjectivity_sync() { beacon_chain.recompute_head_at_current_slot().await; // Check that the new block's state can be loaded correctly. - let state_root = block.state_root(); let mut state = beacon_chain .store - .get_state(&state_root, Some(block.slot())) + .get_state(&state_root, Some(slot)) .unwrap() .unwrap(); assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); @@ -2583,6 +2584,7 @@ fn check_split_slot(harness: &TestHarness, store: Arc, L /// Check that all the states in a chain dump have the correct tree hash. fn check_chain_dump(harness: &TestHarness, expected_len: u64) { let chain_dump = harness.chain.chain_dump().unwrap(); + let split_slot = harness.chain.store.get_split_slot(); assert_eq!(chain_dump.len() as u64, expected_len); @@ -2606,6 +2608,21 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { .slot(), checkpoint.beacon_state.slot() ); + + // Check presence of execution payload on disk. + if harness.chain.spec.bellatrix_fork_epoch.is_some() { + assert_eq!( + harness + .chain + .store + .execution_payload_exists(&checkpoint.beacon_block_root) + .unwrap(), + checkpoint.beacon_block.slot() >= split_slot, + "incorrect payload storage for block at slot {}: {:?}", + checkpoint.beacon_block.slot(), + checkpoint.beacon_block_root, + ); + } } // Check the forwards block roots iterator against the chain dump diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 3c421a1a3..d9d5c715b 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -515,6 +515,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("true") ) + .arg( + Arg::with_name("prune-payloads-on-startup") + .long("prune-payloads-on-startup") + .help("Check for execution payloads to prune on start-up.") + .takes_value(true) + .default_value("true") + ) /* * Misc. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b57ba0268..368fce573 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -358,6 +358,12 @@ pub fn get_config( .map_err(|_| "auto-compact-db takes a boolean".to_string())?; } + if let Some(prune_payloads_on_init) = + clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")? + { + client_config.store.prune_payloads_on_init = prune_payloads_on_init; + } + /* * Zero-ports * diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 4268ec2e9..9bc9ee8a4 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -21,6 +21,8 @@ pub struct StoreConfig { pub compact_on_init: bool, /// Whether to compact the database during database pruning. pub compact_on_prune: bool, + /// Whether to try pruning execution payloads on initialization. + pub prune_payloads_on_init: bool, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -43,6 +45,7 @@ impl Default for StoreConfig { block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, compact_on_init: false, compact_on_prune: true, + prune_payloads_on_init: true, } } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c4b4a64a0..80de674e9 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -7,7 +7,7 @@ use crate::config::{ }; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; use crate::impls::beacon_state::{get_full_state, store_full_state}; -use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; +use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; @@ -438,6 +438,12 @@ impl, Cold: ItemStore> HotColdDB .ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into()) } + /// Check if the execution payload for a block exists on disk. + pub fn execution_payload_exists(&self, block_root: &Hash256) -> Result { + self.get_item::>(block_root) + .map(|payload| payload.is_some()) + } + /// Determine whether a block exists in the database. pub fn block_exists(&self, block_root: &Hash256) -> Result { self.hot_db @@ -1418,6 +1424,93 @@ impl, Cold: ItemStore> HotColdDB &CompactionTimestamp(compaction_timestamp.as_secs()), ) } + + /// Try to prune all execution payloads, returning early if there is no need to prune. + pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> { + let split = self.get_split_info(); + + if split.slot == 0 { + return Ok(()); + } + + let bellatrix_fork_slot = if let Some(epoch) = self.spec.bellatrix_fork_epoch { + epoch.start_slot(E::slots_per_epoch()) + } else { + return Ok(()); + }; + + // Load the split state so we can backtrack to find execution payloads. + let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( + HotColdDBError::MissingSplitState(split.state_root, split.slot), + )?; + + // The finalized block may or may not have its execution payload stored, depending on + // whether it was at a skipped slot. However for a fully pruned database its parent + // should *always* have been pruned. + let split_parent_block_root = split_state.get_block_root(split.slot - 1)?; + if !self.execution_payload_exists(split_parent_block_root)? && !force { + info!(self.log, "Execution payloads are pruned"); + return Ok(()); + } + + // Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes + // first. + let split_block_root = split_state.get_latest_block_root(split.state_root); + let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot); + + let mut ops = vec![]; + + for res in std::iter::once(Ok((split_block_root, split.slot))) + .chain(BlockRootsIterator::new(self, &split_state)) + { + let (block_root, slot) = match res { + Ok(tuple) => tuple, + Err(e) => { + warn!( + self.log, + "Stopping backtrack early"; + "error" => ?e, + ); + break; + } + }; + + if slot < bellatrix_fork_slot { + info!( + self.log, + "Finished backtrack to Bellatrix fork"; + ); + break; + } + + if self.execution_payload_exists(&block_root)? { + debug!( + self.log, + "Pruning execution payload"; + "slot" => slot, + "block_root" => ?block_root, + ); + ops.push(StoreOp::DeleteExecutionPayload(block_root)); + } + + if Some(slot) == anchor_slot { + info!( + self.log, + "Finished backtrack to anchor state"; + "slot" => slot + ); + break; + } + } + let payloads_pruned = ops.len(); + self.do_atomically(ops)?; + info!( + self.log, + "Execution payload pruning complete"; + "payloads_pruned" => payloads_pruned, + ); + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer. @@ -1457,16 +1550,16 @@ pub fn migrate_database, Cold: ItemStore>( let mut hot_db_ops: Vec> = Vec::new(); // 1. Copy all of the states between the head and the split slot, from the hot DB - // to the cold DB. - let state_root_iter = StateRootsIterator::new(&store, frozen_head); - for maybe_pair in state_root_iter.take_while(|result| match result { - Ok((_, slot)) => { + // to the cold DB. Delete the execution payloads of these now-finalized blocks. + let state_root_iter = RootsIterator::new(&store, frozen_head); + for maybe_tuple in state_root_iter.take_while(|result| match result { + Ok((_, _, slot)) => { slot >= ¤t_split_slot && anchor_slot.map_or(true, |anchor_slot| slot >= &anchor_slot) } Err(_) => true, }) { - let (state_root, slot) = maybe_pair?; + let (block_root, state_root, slot) = maybe_tuple?; let mut cold_db_ops: Vec = Vec::new(); @@ -1489,6 +1582,11 @@ pub fn migrate_database, Cold: ItemStore>( // Delete the old summary, and the full state if we lie on an epoch boundary. hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); + + // Delete the execution payload. Even if this execution payload is the payload of the + // new finalized block it is OK to delete it, as `try_get_full_block` looks at the split + // slot when determining whether to reconstruct payloads. + hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); } // Warning: Critical section. We have to take care not to put any of the two databases in an diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 50295df4b..20147adb9 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -59,6 +59,12 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { ) } +pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { + App::new("prune_payloads") + .setting(clap::AppSettings::ColoredHelp) + .about("Prune finalized execution payloads") +} + pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new(CMD) .visible_aliases(&["db"]) @@ -85,6 +91,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) + .subcommand(prune_payloads_app()) } fn parse_client_config( @@ -257,6 +264,30 @@ pub fn migrate_db( ) } +pub fn prune_payloads( + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log, + )?; + + // If we're trigging a prune manually then ignore the check on the split's parent that bails + // out early. + let force = true; + db.try_prune_execution_payloads(force) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Result<(), String> { let client_config = parse_client_config(cli_args, &env)?; @@ -273,6 +304,7 @@ pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Re let inspect_config = parse_inspect_config(cli_args)?; inspect_db(inspect_config, client_config, &context, log) } + ("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log), _ => { return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index b28c1a0c3..aed8ebf39 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1227,6 +1227,19 @@ fn compact_db_flag() { .with_config(|config| assert!(config.store.compact_on_init)); } #[test] +fn prune_payloads_on_startup_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.store.prune_payloads_on_init)); +} +#[test] +fn prune_payloads_on_startup_false() { + CommandLineTest::new() + .flag("prune-payloads-on-startup", Some("false")) + .run_with_zero_port() + .with_config(|config| assert!(!config.store.prune_payloads_on_init)); +} +#[test] fn reconstruct_historic_states_flag() { CommandLineTest::new() .flag("reconstruct-historic-states", None)