Refined payload pruning (#3587)

## Proposed Changes

Improve the payload pruning feature in several ways:

- Payload pruning is now entirely optional. It is enabled by default but can be disabled with `--prune-payloads false`. The previous `--prune-payloads-on-startup` flag from #3565 is removed.
- Initial payload pruning on startup now runs in a background thread. This thread will always load the split state, which is a small fraction of its total work (up to ~300ms) and then backtrack from that state. This pruning process ran in 2m5s on one Prater node with good I/O and 16m on a node with slower I/O.
- To work with the optional payload pruning the database function `try_load_full_block` will now attempt to load execution payloads for finalized slots _if_ pruning is currently disabled. This gives users an opt-out for the extensive traffic between the CL and EL for reconstructing payloads.

## Additional Info

If the `prune-payloads` flag is toggled on and off then the on-startup check may not see any payloads to delete and fail to clean them up. In this case the `lighthouse db prune_payloads` command should be used to force a manual sweep of the database.
This commit is contained in:
Michael Sproul 2022-09-19 07:58:49 +00:00
parent f2ac0738d8
commit 507bb9dad4
6 changed files with 85 additions and 39 deletions

View File

@ -266,13 +266,6 @@ where
self.genesis_time = Some(genesis_state.genesis_time());
// Prune finalized execution payloads.
if store.get_config().prune_payloads_on_init {
store
.try_prune_execution_payloads(false)
.map_err(|e| format!("Error pruning execution payloads: {e:?}"))?;
}
self.op_pool = Some(
store
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
@ -863,6 +856,20 @@ where
beacon_chain.store_migrator.process_reconstruction();
}
// Prune finalized execution payloads in the background.
if beacon_chain.store.get_config().prune_payloads {
let store = beacon_chain.store.clone();
let log = log.clone();
beacon_chain.task_executor.spawn_blocking(
move || {
if let Err(e) = store.try_prune_execution_payloads(false) {
error!(log, "Error pruning payloads in background"; "error" => ?e);
}
},
"prune_payloads_background",
);
}
Ok(beacon_chain)
}
}

View File

@ -516,9 +516,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.default_value("true")
)
.arg(
Arg::with_name("prune-payloads-on-startup")
.long("prune-payloads-on-startup")
.help("Check for execution payloads to prune on start-up.")
Arg::with_name("prune-payloads")
.long("prune-payloads")
.help("Prune execution payloads from Lighthouse's database. This saves space but \
imposes load on the execution client, as payloads need to be \
reconstructed and sent to syncing peers.")
.takes_value(true)
.default_value("true")
)

View File

@ -358,10 +358,8 @@ pub fn get_config<E: EthSpec>(
.map_err(|_| "auto-compact-db takes a boolean".to_string())?;
}
if let Some(prune_payloads_on_init) =
clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")?
{
client_config.store.prune_payloads_on_init = prune_payloads_on_init;
if let Some(prune_payloads) = clap_utils::parse_optional(cli_args, "prune-payloads")? {
client_config.store.prune_payloads = prune_payloads;
}
/*

View File

@ -21,8 +21,8 @@ pub struct StoreConfig {
pub compact_on_init: bool,
/// Whether to compact the database during database pruning.
pub compact_on_prune: bool,
/// Whether to try pruning execution payloads on initialization.
pub prune_payloads_on_init: bool,
/// Whether to prune payloads on initialization and finalization.
pub prune_payloads: bool,
}
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
@ -45,7 +45,7 @@ impl Default for StoreConfig {
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
prune_payloads_on_init: true,
prune_payloads: true,
}
}
}

View File

@ -21,6 +21,7 @@ use crate::{
get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp,
};
use itertools::process_results;
use leveldb::iterator::LevelDBIterator;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
@ -334,8 +335,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
};
// If the block is after the split point then we should have the full execution payload
// stored in the database. Otherwise, just return the blinded block.
// Hold the split lock so that it can't change.
// stored in the database. If it isn't but payload pruning is disabled, try to load it
// on-demand.
//
// Hold the split lock so that it can't change while loading the payload.
let split = self.split.read_recursive();
let block = if blinded_block.message().execution_payload().is_err()
@ -348,6 +351,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.block_cache.lock().put(*block_root, full_block.clone());
DatabaseBlock::Full(full_block)
} else if !self.config.prune_payloads {
// If payload pruning is disabled there's a chance we may have the payload of
// this finalized block. Attempt to load it but don't error in case it's missing.
if let Some(payload) = self.get_execution_payload(block_root)? {
DatabaseBlock::Full(
blinded_block
.try_into_full_block(Some(payload))
.ok_or(Error::AddPayloadLogicError)?,
)
} else {
DatabaseBlock::Blinded(blinded_block)
}
} else {
DatabaseBlock::Blinded(blinded_block)
};
@ -388,7 +403,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blinded_block: SignedBeaconBlock<E, BlindedPayload<E>>,
) -> Result<SignedBeaconBlock<E>, Error> {
if blinded_block.message().execution_payload().is_ok() {
let execution_payload = self.get_execution_payload(block_root)?;
let execution_payload = self
.get_execution_payload(block_root)?
.ok_or(HotColdDBError::MissingExecutionPayload(*block_root))?;
blinded_block.try_into_full_block(Some(execution_payload))
} else {
blinded_block.try_into_full_block(None)
@ -433,9 +450,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_execution_payload(
&self,
block_root: &Hash256,
) -> Result<ExecutionPayload<E>, Error> {
self.get_item(block_root)?
.ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into())
) -> Result<Option<ExecutionPayload<E>>, Error> {
self.get_item(block_root)
}
/// Check if the execution payload for a block exists on disk.
@ -1446,19 +1462,36 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// The finalized block may or may not have its execution payload stored, depending on
// whether it was at a skipped slot. However for a fully pruned database its parent
// should *always* have been pruned.
let split_parent_block_root = split_state.get_block_root(split.slot - 1)?;
if !self.execution_payload_exists(split_parent_block_root)? && !force {
// should *always* have been pruned. In case of a long split (no parent found) we
// continue as if the payloads are pruned, as the node probably has other things to worry
// about.
let split_block_root = split_state.get_latest_block_root(split.state_root);
let already_pruned =
process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| {
iter.find(|(_, block_root)| *block_root != split_block_root)
.map_or(Ok(true), |(_, split_parent_root)| {
self.execution_payload_exists(&split_parent_root)
.map(|exists| !exists)
})
})??;
if already_pruned && !force {
info!(self.log, "Execution payloads are pruned");
return Ok(());
}
// Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes
// first.
let split_block_root = split_state.get_latest_block_root(split.state_root);
warn!(
self.log,
"Pruning finalized payloads";
"info" => "you may notice degraded I/O performance while this runs"
);
let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot);
let mut ops = vec![];
let mut last_pruned_block_root = None;
for res in std::iter::once(Ok((split_block_root, split.slot)))
.chain(BlockRootsIterator::new(self, &split_state))
@ -1468,7 +1501,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Err(e) => {
warn!(
self.log,
"Stopping backtrack early";
"Stopping payload pruning early";
"error" => ?e,
);
break;
@ -1478,25 +1511,28 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
if slot < bellatrix_fork_slot {
info!(
self.log,
"Finished backtrack to Bellatrix fork";
"Payload pruning reached Bellatrix boundary";
);
break;
}
if self.execution_payload_exists(&block_root)? {
if Some(block_root) != last_pruned_block_root
&& self.execution_payload_exists(&block_root)?
{
debug!(
self.log,
"Pruning execution payload";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteExecutionPayload(block_root));
}
if Some(slot) == anchor_slot {
info!(
self.log,
"Finished backtrack to anchor state";
"Payload pruning reached anchor state";
"slot" => slot
);
break;
@ -1583,10 +1619,13 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
// Delete the execution payload. Even if this execution payload is the payload of the
// new finalized block it is OK to delete it, as `try_get_full_block` looks at the split
// slot when determining whether to reconstruct payloads.
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
// delete the payload for the finalized block itself, but that's OK as we only guarantee
// that payloads are present for slots >= the split slot. The payload fetching code is also
// forgiving of missing payloads.
if store.config.prune_payloads {
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
}
}
// Warning: Critical section. We have to take care not to put any of the two databases in an

View File

@ -1227,17 +1227,17 @@ fn compact_db_flag() {
.with_config(|config| assert!(config.store.compact_on_init));
}
#[test]
fn prune_payloads_on_startup_default() {
fn prune_payloads_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert!(config.store.prune_payloads_on_init));
.with_config(|config| assert!(config.store.prune_payloads));
}
#[test]
fn prune_payloads_on_startup_false() {
CommandLineTest::new()
.flag("prune-payloads-on-startup", Some("false"))
.flag("prune-payloads", Some("false"))
.run_with_zero_port()
.with_config(|config| assert!(!config.store.prune_payloads_on_init));
.with_config(|config| assert!(!config.store.prune_payloads));
}
#[test]
fn reconstruct_historic_states_flag() {