Prune finalized execution payloads (#3565)
## Issue Addressed Closes https://github.com/sigp/lighthouse/issues/3556 ## Proposed Changes Delete finalized execution payloads from the database in two places: 1. When running the finalization migration in `migrate_database`. We delete the finalized payloads between the last split point and the new updated split point. _If_ payloads are already pruned prior to this then this is sufficient to prune _all_ payloads as non-canonical payloads are already deleted by the head pruner, and all canonical payloads prior to the previous split will already have been pruned. 2. To address the fact that users will update to this code _after_ the merge on mainnet (and testnets), we need a one-off scan to delete the finalized payloads from the canonical chain. This is implemented in `try_prune_execution_payloads` which runs on startup and scans the chain back to the Bellatrix fork or the anchor slot (if checkpoint synced after Bellatrix). In the case where payloads are already pruned this check only imposes a single state load for the split state, which shouldn't be _too slow_. Even so, a flag `--prepare-payloads-on-startup=false` is provided to turn this off after it has run the first time, which provides faster start-up times. There is also a new `lighthouse db prune_payloads` subcommand for users who prefer to run the pruning manually. ## Additional Info The tests have been updated to not rely on finalized payloads in the database, instead using the `MockExecutionLayer` to reconstruct them. Additionally a check was added to `check_chain_dump` which asserts the non-existence or existence of payloads on disk depending on their slot.
This commit is contained in:
parent
5b2843c2cd
commit
ca42ef2e5a
@ -266,6 +266,13 @@ where
|
||||
|
||||
self.genesis_time = Some(genesis_state.genesis_time());
|
||||
|
||||
// Prune finalized execution payloads.
|
||||
if store.get_config().prune_payloads_on_init {
|
||||
store
|
||||
.try_prune_execution_payloads(false)
|
||||
.map_err(|e| format!("Error pruning execution payloads: {e:?}"))?;
|
||||
}
|
||||
|
||||
self.op_pool = Some(
|
||||
store
|
||||
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
|
||||
|
@ -41,28 +41,27 @@ async fn get_chain_segment() -> Vec<BeaconSnapshot<E>> {
|
||||
)
|
||||
.await;
|
||||
|
||||
harness
|
||||
let mut segment = Vec::with_capacity(CHAIN_SEGMENT_LENGTH);
|
||||
for snapshot in harness
|
||||
.chain
|
||||
.chain_dump()
|
||||
.expect("should dump chain")
|
||||
.into_iter()
|
||||
.map(|snapshot| {
|
||||
let full_block = harness
|
||||
.chain
|
||||
.store
|
||||
.make_full_block(
|
||||
&snapshot.beacon_block_root,
|
||||
snapshot.beacon_block.as_ref().clone(),
|
||||
)
|
||||
.unwrap();
|
||||
BeaconSnapshot {
|
||||
beacon_block_root: snapshot.beacon_block_root,
|
||||
beacon_block: Arc::new(full_block),
|
||||
beacon_state: snapshot.beacon_state,
|
||||
}
|
||||
})
|
||||
.skip(1)
|
||||
.collect()
|
||||
{
|
||||
let full_block = harness
|
||||
.chain
|
||||
.get_block(&snapshot.beacon_block_root)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
segment.push(BeaconSnapshot {
|
||||
beacon_block_root: snapshot.beacon_block_root,
|
||||
beacon_block: Arc::new(full_block),
|
||||
beacon_state: snapshot.beacon_state,
|
||||
});
|
||||
}
|
||||
segment
|
||||
}
|
||||
|
||||
fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessType<E>> {
|
||||
|
@ -2114,14 +2114,16 @@ async fn weak_subjectivity_sync() {
|
||||
assert_eq!(new_blocks[0].beacon_block.slot(), wss_slot + 1);
|
||||
|
||||
for snapshot in new_blocks {
|
||||
let block = &snapshot.beacon_block;
|
||||
let full_block = harness
|
||||
.chain
|
||||
.store
|
||||
.make_full_block(&snapshot.beacon_block_root, block.as_ref().clone())
|
||||
.get_block(&snapshot.beacon_block_root)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let slot = full_block.slot();
|
||||
let state_root = full_block.state_root();
|
||||
|
||||
beacon_chain.slot_clock.set_slot(block.slot().as_u64());
|
||||
beacon_chain.slot_clock.set_slot(slot.as_u64());
|
||||
beacon_chain
|
||||
.process_block(Arc::new(full_block), CountUnrealized::True)
|
||||
.await
|
||||
@ -2129,10 +2131,9 @@ async fn weak_subjectivity_sync() {
|
||||
beacon_chain.recompute_head_at_current_slot().await;
|
||||
|
||||
// Check that the new block's state can be loaded correctly.
|
||||
let state_root = block.state_root();
|
||||
let mut state = beacon_chain
|
||||
.store
|
||||
.get_state(&state_root, Some(block.slot()))
|
||||
.get_state(&state_root, Some(slot))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
|
||||
@ -2583,6 +2584,7 @@ fn check_split_slot(harness: &TestHarness, store: Arc<HotColdDB<E, LevelDB<E>, L
|
||||
/// Check that all the states in a chain dump have the correct tree hash.
|
||||
fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
|
||||
let chain_dump = harness.chain.chain_dump().unwrap();
|
||||
let split_slot = harness.chain.store.get_split_slot();
|
||||
|
||||
assert_eq!(chain_dump.len() as u64, expected_len);
|
||||
|
||||
@ -2606,6 +2608,21 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
|
||||
.slot(),
|
||||
checkpoint.beacon_state.slot()
|
||||
);
|
||||
|
||||
// Check presence of execution payload on disk.
|
||||
if harness.chain.spec.bellatrix_fork_epoch.is_some() {
|
||||
assert_eq!(
|
||||
harness
|
||||
.chain
|
||||
.store
|
||||
.execution_payload_exists(&checkpoint.beacon_block_root)
|
||||
.unwrap(),
|
||||
checkpoint.beacon_block.slot() >= split_slot,
|
||||
"incorrect payload storage for block at slot {}: {:?}",
|
||||
checkpoint.beacon_block.slot(),
|
||||
checkpoint.beacon_block_root,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Check the forwards block roots iterator against the chain dump
|
||||
|
@ -515,6 +515,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
.takes_value(true)
|
||||
.default_value("true")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("prune-payloads-on-startup")
|
||||
.long("prune-payloads-on-startup")
|
||||
.help("Check for execution payloads to prune on start-up.")
|
||||
.takes_value(true)
|
||||
.default_value("true")
|
||||
)
|
||||
|
||||
/*
|
||||
* Misc.
|
||||
|
@ -358,6 +358,12 @@ pub fn get_config<E: EthSpec>(
|
||||
.map_err(|_| "auto-compact-db takes a boolean".to_string())?;
|
||||
}
|
||||
|
||||
if let Some(prune_payloads_on_init) =
|
||||
clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")?
|
||||
{
|
||||
client_config.store.prune_payloads_on_init = prune_payloads_on_init;
|
||||
}
|
||||
|
||||
/*
|
||||
* Zero-ports
|
||||
*
|
||||
|
@ -21,6 +21,8 @@ pub struct StoreConfig {
|
||||
pub compact_on_init: bool,
|
||||
/// Whether to compact the database during database pruning.
|
||||
pub compact_on_prune: bool,
|
||||
/// Whether to try pruning execution payloads on initialization.
|
||||
pub prune_payloads_on_init: bool,
|
||||
}
|
||||
|
||||
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
|
||||
@ -43,6 +45,7 @@ impl Default for StoreConfig {
|
||||
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
|
||||
compact_on_init: false,
|
||||
compact_on_prune: true,
|
||||
prune_payloads_on_init: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ use crate::config::{
|
||||
};
|
||||
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
|
||||
use crate::impls::beacon_state::{get_full_state, store_full_state};
|
||||
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
|
||||
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
|
||||
use crate::leveldb_store::BytesKey;
|
||||
use crate::leveldb_store::LevelDB;
|
||||
use crate::memory_store::MemoryStore;
|
||||
@ -438,6 +438,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into())
|
||||
}
|
||||
|
||||
/// Check if the execution payload for a block exists on disk.
|
||||
pub fn execution_payload_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
self.get_item::<ExecutionPayload<E>>(block_root)
|
||||
.map(|payload| payload.is_some())
|
||||
}
|
||||
|
||||
/// Determine whether a block exists in the database.
|
||||
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
self.hot_db
|
||||
@ -1418,6 +1424,93 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
&CompactionTimestamp(compaction_timestamp.as_secs()),
|
||||
)
|
||||
}
|
||||
|
||||
/// Try to prune all execution payloads, returning early if there is no need to prune.
|
||||
pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> {
|
||||
let split = self.get_split_info();
|
||||
|
||||
if split.slot == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let bellatrix_fork_slot = if let Some(epoch) = self.spec.bellatrix_fork_epoch {
|
||||
epoch.start_slot(E::slots_per_epoch())
|
||||
} else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Load the split state so we can backtrack to find execution payloads.
|
||||
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
|
||||
HotColdDBError::MissingSplitState(split.state_root, split.slot),
|
||||
)?;
|
||||
|
||||
// The finalized block may or may not have its execution payload stored, depending on
|
||||
// whether it was at a skipped slot. However for a fully pruned database its parent
|
||||
// should *always* have been pruned.
|
||||
let split_parent_block_root = split_state.get_block_root(split.slot - 1)?;
|
||||
if !self.execution_payload_exists(split_parent_block_root)? && !force {
|
||||
info!(self.log, "Execution payloads are pruned");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes
|
||||
// first.
|
||||
let split_block_root = split_state.get_latest_block_root(split.state_root);
|
||||
let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot);
|
||||
|
||||
let mut ops = vec![];
|
||||
|
||||
for res in std::iter::once(Ok((split_block_root, split.slot)))
|
||||
.chain(BlockRootsIterator::new(self, &split_state))
|
||||
{
|
||||
let (block_root, slot) = match res {
|
||||
Ok(tuple) => tuple,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
self.log,
|
||||
"Stopping backtrack early";
|
||||
"error" => ?e,
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if slot < bellatrix_fork_slot {
|
||||
info!(
|
||||
self.log,
|
||||
"Finished backtrack to Bellatrix fork";
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
if self.execution_payload_exists(&block_root)? {
|
||||
debug!(
|
||||
self.log,
|
||||
"Pruning execution payload";
|
||||
"slot" => slot,
|
||||
"block_root" => ?block_root,
|
||||
);
|
||||
ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
|
||||
if Some(slot) == anchor_slot {
|
||||
info!(
|
||||
self.log,
|
||||
"Finished backtrack to anchor state";
|
||||
"slot" => slot
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
let payloads_pruned = ops.len();
|
||||
self.do_atomically(ops)?;
|
||||
info!(
|
||||
self.log,
|
||||
"Execution payload pruning complete";
|
||||
"payloads_pruned" => payloads_pruned,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||
@ -1457,16 +1550,16 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
|
||||
|
||||
// 1. Copy all of the states between the head and the split slot, from the hot DB
|
||||
// to the cold DB.
|
||||
let state_root_iter = StateRootsIterator::new(&store, frozen_head);
|
||||
for maybe_pair in state_root_iter.take_while(|result| match result {
|
||||
Ok((_, slot)) => {
|
||||
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
|
||||
let state_root_iter = RootsIterator::new(&store, frozen_head);
|
||||
for maybe_tuple in state_root_iter.take_while(|result| match result {
|
||||
Ok((_, _, slot)) => {
|
||||
slot >= ¤t_split_slot
|
||||
&& anchor_slot.map_or(true, |anchor_slot| slot >= &anchor_slot)
|
||||
}
|
||||
Err(_) => true,
|
||||
}) {
|
||||
let (state_root, slot) = maybe_pair?;
|
||||
let (block_root, state_root, slot) = maybe_tuple?;
|
||||
|
||||
let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
|
||||
|
||||
@ -1489,6 +1582,11 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
|
||||
// Delete the old summary, and the full state if we lie on an epoch boundary.
|
||||
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
|
||||
|
||||
// Delete the execution payload. Even if this execution payload is the payload of the
|
||||
// new finalized block it is OK to delete it, as `try_get_full_block` looks at the split
|
||||
// slot when determining whether to reconstruct payloads.
|
||||
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
|
@ -59,6 +59,12 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> {
|
||||
App::new("prune_payloads")
|
||||
.setting(clap::AppSettings::ColoredHelp)
|
||||
.about("Prune finalized execution payloads")
|
||||
}
|
||||
|
||||
pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
App::new(CMD)
|
||||
.visible_aliases(&["db"])
|
||||
@ -85,6 +91,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
.subcommand(migrate_cli_app())
|
||||
.subcommand(version_cli_app())
|
||||
.subcommand(inspect_cli_app())
|
||||
.subcommand(prune_payloads_app())
|
||||
}
|
||||
|
||||
fn parse_client_config<E: EthSpec>(
|
||||
@ -257,6 +264,30 @@ pub fn migrate_db<E: EthSpec>(
|
||||
)
|
||||
}
|
||||
|
||||
pub fn prune_payloads<E: EthSpec>(
|
||||
client_config: ClientConfig,
|
||||
runtime_context: &RuntimeContext<E>,
|
||||
log: Logger,
|
||||
) -> Result<(), Error> {
|
||||
let spec = &runtime_context.eth2_config.spec;
|
||||
let hot_path = client_config.get_db_path();
|
||||
let cold_path = client_config.get_freezer_db_path();
|
||||
|
||||
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
|
||||
&hot_path,
|
||||
&cold_path,
|
||||
|_, _, _| Ok(()),
|
||||
client_config.store,
|
||||
spec.clone(),
|
||||
log,
|
||||
)?;
|
||||
|
||||
// If we're trigging a prune manually then ignore the check on the split's parent that bails
|
||||
// out early.
|
||||
let force = true;
|
||||
db.try_prune_execution_payloads(force)
|
||||
}
|
||||
|
||||
/// Run the database manager, returning an error string if the operation did not succeed.
|
||||
pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, mut env: Environment<T>) -> Result<(), String> {
|
||||
let client_config = parse_client_config(cli_args, &env)?;
|
||||
@ -273,6 +304,7 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, mut env: Environment<T>) -> Re
|
||||
let inspect_config = parse_inspect_config(cli_args)?;
|
||||
inspect_db(inspect_config, client_config, &context, log)
|
||||
}
|
||||
("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log),
|
||||
_ => {
|
||||
return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into())
|
||||
}
|
||||
|
@ -1227,6 +1227,19 @@ fn compact_db_flag() {
|
||||
.with_config(|config| assert!(config.store.compact_on_init));
|
||||
}
|
||||
#[test]
|
||||
fn prune_payloads_on_startup_default() {
|
||||
CommandLineTest::new()
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| assert!(config.store.prune_payloads_on_init));
|
||||
}
|
||||
#[test]
|
||||
fn prune_payloads_on_startup_false() {
|
||||
CommandLineTest::new()
|
||||
.flag("prune-payloads-on-startup", Some("false"))
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| assert!(!config.store.prune_payloads_on_init));
|
||||
}
|
||||
#[test]
|
||||
fn reconstruct_historic_states_flag() {
|
||||
CommandLineTest::new()
|
||||
.flag("reconstruct-historic-states", None)
|
||||
|
Loading…
Reference in New Issue
Block a user