Merge pull request #3852 from emhane/prune_blobs

Prune blobs
This commit is contained in:
realbigsean 2023-02-08 13:39:26 -05:00 committed by GitHub
commit 41567194e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 525 additions and 78 deletions

3
.gitignore vendored
View File

@ -12,3 +12,6 @@ genesis.ssz
# IntelliJ
/*.iml
# VSCode
/.vscode

View File

@ -611,10 +611,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
end_slot,
|| {
(
Ok((
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_block_root,
)
))
},
&self.spec,
)?;
@ -708,10 +708,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
end_slot,
|| {
(
Ok((
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_state_root(),
)
))
},
&self.spec,
)?;
@ -2878,7 +2878,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut kv_store_ops = self
let mut ops = self
.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
@ -2981,9 +2981,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
// Most blocks are now capable of being attested to thanks to the `early_attester_cache`
// cache above. Resume non-essential processing.
//
// It is important NOT to return errors here before the database commit, because the block
// has already been added to fork choice and the database would be left in an inconsistent
// state if we returned early without committing. In other words, an error here would
// corrupt the node's database permanently.
// -----------------------------------------------------------------------------------------
self.import_block_update_shuffling_cache(block_root, &mut state)?;
self.import_block_update_shuffling_cache(block_root, &mut state);
self.import_block_observe_attestations(
block,
&state,
@ -3008,25 +3013,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028
let (signed_block, blobs) = signed_block.deconstruct();
let block = signed_block.message();
let mut ops: Vec<_> = confirmed_state_roots
.into_iter()
.map(StoreOp::DeleteStateTemporaryFlag)
.collect();
ops.extend(
confirmed_state_roots
.into_iter()
.map(StoreOp::DeleteStateTemporaryFlag),
);
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));
if let Some(blobs) = blobs {
if blobs.blobs.len() > 0 {
//FIXME(sean) using this for debugging for now
info!(self.log, "Writing blobs to store"; "block_root" => ?block_root);
ops.push(StoreOp::PutBlobs(block_root, blobs));
// Only consider blobs if the eip4844 fork is enabled.
if let Some(data_availability_boundary) = self.data_availability_boundary() {
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let margin_epochs = self.store.get_config().blob_prune_margin_epochs;
let import_boundary = data_availability_boundary - margin_epochs;
// Only store blobs at the data availability boundary, minus any configured epochs
// margin, or younger (of higher epoch number).
if block_epoch >= import_boundary {
if let Some(blobs) = blobs {
if blobs.blobs.len() > 0 {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
"block_root" => ?block_root
);
ops.push(StoreOp::PutBlobs(block_root, blobs));
}
}
}
};
}
let txn_lock = self.store.hot_db.begin_rw_transaction();
kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?);
if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
if let Err(e) = self.store.do_atomically(ops) {
error!(
self.log,
"Database write failed!";
@ -3455,13 +3474,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
fn import_block_update_shuffling_cache(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) {
if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) {
warn!(
self.log,
"Failed to prime shuffling cache";
"error" => ?e
);
}
}
fn import_block_update_shuffling_cache_fallible(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;

View File

@ -914,6 +914,11 @@ where
);
}
// Prune blobs sidecars older than the blob data availability boundary in the background.
beacon_chain
.store_migrator
.process_prune_blobs(beacon_chain.data_availability_boundary());
Ok(beacon_chain)
}
}

View File

@ -751,6 +751,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Drop the old cache head nice and early to try and free the memory as soon as possible.
drop(old_cached_head);
// Prune blobs in the background.
self.store_migrator
.process_prune_blobs(self.data_availability_boundary());
// If the finalized checkpoint changed, perform some updates.
//
// The `after_finalization` function will take a write-lock on `fork_choice`, therefore it

View File

@ -86,6 +86,7 @@ pub enum PruningError {
pub enum Notification {
Finalization(FinalizationNotification),
Reconstruction,
PruneBlobs(Option<Epoch>),
}
pub struct FinalizationNotification {
@ -152,6 +153,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}
pub fn process_prune_blobs(&self, data_availability_boundary: Option<Epoch>) {
if let Some(Notification::PruneBlobs(data_availability_boundary)) =
self.send_background_notification(Notification::PruneBlobs(data_availability_boundary))
{
Self::run_prune_blobs(self.db.clone(), data_availability_boundary, &self.log);
}
}
pub fn run_reconstruction(db: Arc<HotColdDB<E, Hot, Cold>>, log: &Logger) {
if let Err(e) = db.reconstruct_historic_states() {
error!(
@ -162,6 +171,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}
pub fn run_prune_blobs(
db: Arc<HotColdDB<E, Hot, Cold>>,
data_availability_boundary: Option<Epoch>,
log: &Logger,
) {
if let Err(e) = db.try_prune_blobs(false, data_availability_boundary) {
error!(
log,
"Blobs pruning failed";
"error" => ?e,
);
}
}
/// If configured to run in the background, send `notif` to the background thread.
///
/// Return `None` if the message was sent to the background thread, `Some(notif)` otherwise.
@ -320,11 +343,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
best
}
}
(Notification::Finalization(_), Notification::PruneBlobs(_)) => best,
(Notification::PruneBlobs(_), Notification::Finalization(_)) => other,
(Notification::PruneBlobs(dab1), Notification::PruneBlobs(dab2)) => {
if dab2 > dab1 {
other
} else {
best
}
}
});
match notif {
Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log),
Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log),
Notification::PruneBlobs(dab) => Self::run_prune_blobs(db.clone(), dab, &log),
}
}
});
@ -569,10 +602,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.into_iter()
.map(Into::into)
.flat_map(|block_root: Hash256| {
[
let mut store_ops = vec![
StoreOp::DeleteBlock(block_root),
StoreOp::DeleteExecutionPayload(block_root),
]
];
if let Ok(true) = store.blobs_sidecar_exists(&block_root) {
// Keep track of non-empty orphaned blobs sidecars.
store_ops.extend([
StoreOp::DeleteBlobs(block_root),
StoreOp::PutOrphanedBlobsKey(block_root),
]);
}
store_ops
})
.chain(
abandoned_states

View File

@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::convert::TryInto;
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem};
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
/// Provides a mapping of `validator_index -> validator_publickey`.
@ -38,7 +38,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
};
let store_ops = cache.import_new_pubkeys(state)?;
store.hot_db.do_atomically(store_ops)?;
store.do_atomically(store_ops)?;
Ok(cache)
}
@ -79,7 +79,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pub fn import_new_pubkeys(
&mut self,
state: &BeaconState<T::EthSpec>,
) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> {
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError> {
if state.validators().len() > self.pubkeys.len() {
self.import(
state.validators()[self.pubkeys.len()..]
@ -92,7 +92,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
}
/// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, BeaconChainError>
fn import<I>(
&mut self,
validator_keys: I,
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError>
where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{
@ -112,7 +115,9 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)));
store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)),
));
self.pubkeys.push(
(&pubkey)
@ -294,7 +299,7 @@ mod test {
let ops = cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
store.hot_db.do_atomically(ops).unwrap();
store.do_atomically(ops).unwrap();
check_cache_get(&cache, &keypairs[..]);
drop(cache);

View File

@ -688,12 +688,10 @@ impl<T: BeaconChainTypes> Worker<T> {
let serve_blobs_from_slot = if start_epoch < data_availability_boundary {
// Attempt to serve from the earliest block in our database, falling back to the data
// availability boundary
let oldest_blob_slot = self
.chain
.store
.get_blob_info()
.map(|blob_info| blob_info.oldest_blob_slot)
.unwrap_or(data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()));
let oldest_blob_slot =
self.chain.store.get_blob_info().oldest_blob_slot.unwrap_or(
data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()),
);
debug!(
self.log,

View File

@ -551,6 +551,31 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.default_value("true")
)
.arg(
Arg::with_name("prune-blobs")
.long("prune-blobs")
.help("Prune blobs from Lighthouse's database when they are older than the data \
data availability boundary relative to the current epoch.")
.takes_value(true)
.default_value("true")
)
.arg(
Arg::with_name("epochs-per-blob-prune")
.long("epochs-per-blob-prune")
.help("The epoch interval with which to prune blobs from Lighthouse's \
database when they are older than the data availability boundary \
relative to the current epoch.")
.takes_value(true)
.default_value("1")
)
.arg(
Arg::with_name("blob-prune-margin-epochs")
.long("blob-prune-margin-epochs")
.help("The margin for blob pruning in epochs. The oldest blobs are pruned \
up until data_availability_boundary - blob_prune_margin_epochs.")
.takes_value(true)
.default_value("0")
)
/*
* Misc.

View File

@ -411,6 +411,22 @@ pub fn get_config<E: EthSpec>(
client_config.store.prune_payloads = prune_payloads;
}
if let Some(prune_blobs) = clap_utils::parse_optional(cli_args, "prune-blobs")? {
client_config.store.prune_blobs = prune_blobs;
}
if let Some(epochs_per_blob_prune) =
clap_utils::parse_optional(cli_args, "epochs-per-blob-prune")?
{
client_config.store.epochs_per_blob_prune = epochs_per_blob_prune;
}
if let Some(blob_prune_margin_epochs) =
clap_utils::parse_optional(cli_args, "blob-prune-margin-epochs")?
{
client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs;
}
/*
* Zero-ports
*

View File

@ -8,6 +8,8 @@ pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;
pub const DEFAULT_BLOB_CACHE_SIZE: usize = 5;
pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1;
pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0;
/// Database configuration parameters.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -26,6 +28,13 @@ pub struct StoreConfig {
pub compact_on_prune: bool,
/// Whether to prune payloads on initialization and finalization.
pub prune_payloads: bool,
/// Whether to prune blobs older than the blob data availability boundary.
pub prune_blobs: bool,
/// Frequency of blob pruning in epochs. Default: 1 (every epoch).
pub epochs_per_blob_prune: u64,
/// The margin for blob pruning in epochs. The oldest blobs are pruned up until
/// data_availability_boundary - blob_prune_margin_epochs. Default: 0.
pub blob_prune_margin_epochs: u64,
}
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
@ -50,6 +59,9 @@ impl Default for StoreConfig {
compact_on_init: false,
compact_on_prune: true,
prune_payloads: true,
prune_blobs: true,
epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE,
blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS,
}
}
}

View File

@ -19,6 +19,8 @@ pub enum Error {
},
RlpError(String),
BlockNotFound(Hash256),
/// The blobs sidecar mapping to this block root is older than the data availability boundary.
BlobsTooOld(Hash256, Slot),
NoContinuationData,
SplitPointModified(Slot, Slot),
ConfigError(StoreConfigError),

View File

@ -150,7 +150,7 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
store: &'a HotColdDB<E, Hot, Cold>,
start_slot: Slot,
end_slot: Option<Slot>,
get_state: impl FnOnce() -> (BeaconState<E>, Hash256),
get_state: impl FnOnce() -> Result<(BeaconState<E>, Hash256)>,
spec: &ChainSpec,
) -> Result<Self> {
use HybridForwardsIterator::*;
@ -172,7 +172,7 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
if end_slot.map_or(false, |end_slot| end_slot < latest_restore_point_slot) {
None
} else {
Some(Box::new(get_state()))
Some(Box::new(get_state()?))
};
PreFinalization {
iter,
@ -180,7 +180,7 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
}
} else {
PostFinalizationLazy {
continuation_data: Some(Box::new(get_state())),
continuation_data: Some(Box::new(get_state()?)),
store,
start_slot,
}

View File

@ -38,6 +38,7 @@ use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
use types::*;
/// On-disk database that stores finalized states efficiently.
@ -54,7 +55,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The starting slots for the range of blocks & states stored in the database.
anchor_info: RwLock<Option<AnchorInfo>>,
/// The starting slots for the range of blobs stored in the database.
blob_info: RwLock<Option<BlobInfo>>,
blob_info: RwLock<BlobInfo>,
pub(crate) config: StoreConfig,
/// Cold database containing compact historical data.
pub cold_db: Cold,
@ -108,6 +109,7 @@ pub enum HotColdDBError {
slots_per_historical_root: u64,
slots_per_epoch: u64,
},
ZeroEpochsPerBlobPrune,
RestorePointBlockHashError(BeaconStateError),
IterationError {
unexpected_key: BytesKey,
@ -125,12 +127,12 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
spec: ChainSpec,
log: Logger,
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
Self::verify_config(&config)?;
let db = HotColdDB {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
blob_info: RwLock::new(None),
blob_info: RwLock::new(BlobInfo::default()),
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
@ -165,7 +167,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
let mut db = HotColdDB {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
blob_info: RwLock::new(None),
blob_info: RwLock::new(BlobInfo::default()),
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
@ -226,6 +228,17 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
db.store_schema_version(CURRENT_SCHEMA_VERSION)?;
}
if let Some(blob_info) = db.load_blob_info()? {
let oldest_blob_slot = blob_info.oldest_blob_slot;
*db.blob_info.write() = blob_info;
info!(
db.log,
"Blob info loaded from disk";
"oldest_blob_slot" => ?oldest_blob_slot,
);
}
// Ensure that any on-disk config is compatible with the supplied config.
if let Some(disk_config) = db.load_config()? {
db.config.check_compatibility(&disk_config)?;
@ -477,6 +490,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map(|payload| payload.is_some())
}
/// Check if the blobs sidecar for a block exists on disk.
pub fn blobs_sidecar_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
self.get_item::<BlobsSidecar<E>>(block_root)
.map(|blobs| blobs.is_some())
}
/// Determine whether a block exists in the database.
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
self.hot_db
@ -646,7 +665,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self,
start_slot,
None,
|| (end_state, end_block_root),
|| Ok((end_state, end_block_root)),
spec,
)
}
@ -655,7 +674,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
start_slot: Slot,
end_slot: Slot,
get_state: impl FnOnce() -> (BeaconState<E>, Hash256),
get_state: impl FnOnce() -> Result<(BeaconState<E>, Hash256), Error>,
spec: &ChainSpec,
) -> Result<HybridForwardsBlockRootsIterator<E, Hot, Cold>, Error> {
HybridForwardsBlockRootsIterator::new(self, start_slot, Some(end_slot), get_state, spec)
@ -672,7 +691,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self,
start_slot,
None,
|| (end_state, end_state_root),
|| Ok((end_state, end_state_root)),
spec,
)
}
@ -681,7 +700,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
start_slot: Slot,
end_slot: Slot,
get_state: impl FnOnce() -> (BeaconState<E>, Hash256),
get_state: impl FnOnce() -> Result<(BeaconState<E>, Hash256), Error>,
spec: &ChainSpec,
) -> Result<HybridForwardsStateRootsIterator<E, Hot, Cold>, Error> {
HybridForwardsStateRootsIterator::new(self, start_slot, Some(end_slot), get_state, spec)
@ -777,6 +796,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
StoreOp::DeleteBlobs(block_root) => {
let key = get_key_for_col(DBColumn::BeaconBlob.into(), block_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
StoreOp::DeleteState(state_root, slot) => {
let state_summary_key =
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes());
@ -793,6 +817,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
StoreOp::PutOrphanedBlobsKey(block_root) => {
let db_key =
get_key_for_col(DBColumn::BeaconBlobOrphan.into(), block_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::PutKeyValue(db_key, [].into()));
}
StoreOp::KeyValueOp(kv_op) => {
key_value_batch.push(kv_op);
}
}
}
Ok(key_value_batch)
@ -826,15 +860,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
guard.pop(block_root);
}
StoreOp::DeleteBlobs(block_root) => {
guard_blob.pop(block_root);
}
StoreOp::DeleteState(_, _) => (),
StoreOp::DeleteExecutionPayload(_) => (),
StoreOp::PutOrphanedBlobsKey(_) => (),
StoreOp::KeyValueOp(_) => (),
}
}
self.hot_db
.do_atomically(self.convert_to_kv_batch(batch)?)?;
drop(guard);
drop(guard_blob);
Ok(())
}
@ -1046,7 +1089,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let state_root_iter = self.forwards_state_roots_iterator_until(
low_restore_point.slot(),
slot,
|| (high_restore_point, Hash256::zero()),
|| Ok((high_restore_point, Hash256::zero())),
&self.spec,
)?;
@ -1311,7 +1354,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Get a clone of the store's blob info.
///
/// To do mutations, use `compare_and_set_blob_info`.
pub fn get_blob_info(&self) -> Option<BlobInfo> {
pub fn get_blob_info(&self) -> BlobInfo {
self.blob_info.read_recursive().clone()
}
@ -1322,10 +1365,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
///
/// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided
/// is not correct.
pub fn compare_and_set_blob_info(
fn compare_and_set_blob_info(
&self,
prev_value: Option<BlobInfo>,
new_value: Option<BlobInfo>,
prev_value: BlobInfo,
new_value: BlobInfo,
) -> Result<KeyValueStoreOp, Error> {
let mut blob_info = self.blob_info.write();
if *blob_info == prev_value {
@ -1333,15 +1376,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
*blob_info = new_value;
Ok(kv_op)
} else {
Err(Error::AnchorInfoConcurrentMutation)
Err(Error::BlobInfoConcurrentMutation)
}
}
/// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately.
pub fn compare_and_set_blob_info_with_write(
fn compare_and_set_blob_info_with_write(
&self,
prev_value: Option<BlobInfo>,
new_value: Option<BlobInfo>,
prev_value: BlobInfo,
new_value: BlobInfo,
) -> Result<(), Error> {
let kv_store_op = self.compare_and_set_blob_info(prev_value, new_value)?;
self.hot_db.do_atomically(vec![kv_store_op])
@ -1356,15 +1399,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
///
/// The argument is intended to be `self.blob_info`, but is passed manually to avoid issues
/// with recursive locking.
fn store_blob_info_in_batch(&self, blob_info: &Option<BlobInfo>) -> KeyValueStoreOp {
if let Some(ref blob_info) = blob_info {
blob_info.as_kv_store_op(BLOB_INFO_KEY)
} else {
KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconMeta.into(),
BLOB_INFO_KEY.as_bytes(),
))
}
fn store_blob_info_in_batch(&self, blob_info: &BlobInfo) -> KeyValueStoreOp {
blob_info.as_kv_store_op(BLOB_INFO_KEY)
}
/// Return the slot-window describing the available historic states.
@ -1487,6 +1523,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db.get(state_root)
}
/// Verify that a parsed config.
fn verify_config(config: &StoreConfig) -> Result<(), HotColdDBError> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
Self::verify_epochs_per_blob_prune(config.epochs_per_blob_prune)
}
/// Check that the restore point frequency is valid.
///
/// Specifically, check that it is:
@ -1517,6 +1559,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
// Check that epochs_per_blob_prune is at least 1 epoch to avoid attempting to prune the same
// epochs over and over again.
fn verify_epochs_per_blob_prune(epochs_per_blob_prune: u64) -> Result<(), HotColdDBError> {
if epochs_per_blob_prune > 0 {
Ok(())
} else {
Err(HotColdDBError::ZeroEpochsPerBlobPrune)
}
}
/// Run a compaction pass to free up space used by deleted states.
pub fn compact(&self) -> Result<(), Error> {
self.hot_db.compact()?;
@ -1669,6 +1721,166 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
);
Ok(())
}
/// Try to prune blobs, approximating the current epoch from lower epoch numbers end (older
/// end) and is useful when the data availability boundary is not at hand.
pub fn try_prune_most_blobs(&self, force: bool) -> Result<(), Error> {
let eip4844_fork = match self.spec.eip4844_fork_epoch {
Some(epoch) => epoch,
None => {
debug!(self.log, "Eip4844 fork is disabled");
return Ok(());
}
};
// At best, current_epoch = split_epoch + 2. However, if finalization doesn't advance, the
// `split.slot` is not updated and current_epoch > split_epoch + 2.
let min_current_epoch = self.get_split_slot().epoch(E::slots_per_epoch()) + Epoch::new(2);
let min_data_availability_boundary = std::cmp::max(
eip4844_fork,
min_current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS),
);
self.try_prune_blobs(force, Some(min_data_availability_boundary))
}
/// Try to prune blobs older than the data availability boundary.
pub fn try_prune_blobs(
&self,
force: bool,
data_availability_boundary: Option<Epoch>,
) -> Result<(), Error> {
let (data_availability_boundary, eip4844_fork) =
match (data_availability_boundary, self.spec.eip4844_fork_epoch) {
(Some(boundary_epoch), Some(fork_epoch)) => (boundary_epoch, fork_epoch),
_ => {
debug!(self.log, "Eip4844 fork is disabled");
return Ok(());
}
};
let should_prune_blobs = self.get_config().prune_blobs;
if !should_prune_blobs && !force {
debug!(
self.log,
"Blob pruning is disabled";
"prune_blobs" => should_prune_blobs
);
return Ok(());
}
let blob_info = self.get_blob_info();
let oldest_blob_slot = blob_info
.oldest_blob_slot
.unwrap_or(eip4844_fork.start_slot(E::slots_per_epoch()));
// The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the
// middle of an epoch otherwise the oldest blob slot is a start slot.
let last_pruned_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()) - 1;
// At most prune blobs up until the data availability boundary epoch, leaving at least
// blobs of the data availability boundary epoch and younger.
let earliest_prunable_epoch = data_availability_boundary - 1;
// Stop pruning before reaching the data availability boundary if a margin is configured.
let margin_epochs = self.get_config().blob_prune_margin_epochs;
let end_epoch = earliest_prunable_epoch - margin_epochs;
if !force {
if last_pruned_epoch.as_u64() + self.get_config().epochs_per_blob_prune
> end_epoch.as_u64()
{
info!(self.log, "Blobs sidecars are pruned");
return Ok(());
}
}
// Iterate block roots forwards from the oldest blob slot.
debug!(
self.log,
"Pruning blobs sidecars stored longer than data availability boundary";
);
// todo(emhane): If we notice degraded I/O for users switching modes (prune_blobs=true to
// prune_blobs=false) we could add a warning that only fires on a threshold, e.g. more
// than 2x epochs_per_blob_prune epochs without a prune.
let mut ops = vec![];
let mut last_pruned_block_root = None;
let end_slot = end_epoch.end_slot(E::slots_per_epoch());
for res in self.forwards_block_roots_iterator_until(
oldest_blob_slot,
end_slot,
|| {
// todo(emhane): In the future, if the data availability boundary is more recent
// than the split (finalized) epoch, this code will have to change to decide what
// to do with pruned blobs in our not-yet-finalized canonical chain and
// not-yet-orphaned forks (see DBColumn::BeaconBlobOrphan).
//
// Related to review and the spec PRs linked in it:
// https://github.com/sigp/lighthouse/pull/3852#pullrequestreview-1244785136
let split = self.get_split_info();
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
HotColdDBError::MissingSplitState(split.state_root, split.slot),
)?;
let split_block_root = split_state.get_latest_block_root(split.state_root);
Ok((split_state, split_block_root))
},
&self.spec,
)? {
let (block_root, slot) = match res {
Ok(tuple) => tuple,
Err(e) => {
warn!(
self.log,
"Stopping blobs sidecar pruning early";
"error" => ?e,
);
break;
}
};
if Some(block_root) != last_pruned_block_root
&& self.blobs_sidecar_exists(&block_root)?
{
debug!(
self.log,
"Pruning blobs sidecar";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteBlobs(block_root));
}
if slot >= end_slot {
info!(
self.log,
"Blobs sidecar pruning reached earliest available blobs sidecar";
"slot" => slot
);
break;
}
}
let blobs_sidecars_pruned = ops.len();
let update_blob_info = self.compare_and_set_blob_info(
blob_info,
BlobInfo {
oldest_blob_slot: Some(end_slot + 1),
},
)?;
ops.push(StoreOp::KeyValueOp(update_blob_info));
self.do_atomically(ops)?;
info!(
self.log,
"Blobs sidecar pruning complete";
"blobs_sidecars_pruned" => blobs_sidecars_pruned,
);
Ok(())
}
}
/// Advance the split point of the store, moving new finalized states to the freezer.
@ -1710,6 +1922,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// 1. Copy all of the states between the head and the split slot, from the hot DB
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
let state_root_iter = RootsIterator::new(&store, frozen_head);
for maybe_tuple in state_root_iter.take_while(|result| match result {
Ok((_, _, slot)) => {
slot >= &current_split_slot
@ -1751,7 +1964,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
}
// Warning: Critical section. We have to take care not to put any of the two databases in an
// inconsistent state if the OS process dies at any point during the freezeing
// inconsistent state if the OS process dies at any point during the freezing
// procedure.
//
// Since it is pretty much impossible to be atomic across more than one database, we trade
@ -1767,7 +1980,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
let mut split_guard = store.split.write();
let latest_split_slot = split_guard.slot;
// Detect a sitation where the split point is (erroneously) changed from more than one
// Detect a situation where the split point is (erroneously) changed from more than one
// place in code.
if latest_split_slot != current_split_slot {
error!(
@ -1811,7 +2024,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
}
/// Struct for storing the split slot and state root in the database.
#[derive(Debug, Clone, Copy, PartialEq, Default, Encode, Decode, Deserialize, Serialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Encode, Decode, Deserialize, Serialize)]
pub struct Split {
pub(crate) slot: Slot,
pub(crate) state_root: Hash256,

View File

@ -1,7 +1,7 @@
use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode};
use types::{
EthSpec, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844,
BlobsSidecar, EthSpec, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844,
ExecutionPayloadMerge,
};
@ -25,6 +25,7 @@ macro_rules! impl_store_item {
impl_store_item!(ExecutionPayloadMerge);
impl_store_item!(ExecutionPayloadCapella);
impl_store_item!(ExecutionPayloadEip4844);
impl_store_item!(BlobsSidecar);
/// This fork-agnostic implementation should be only used for writing.
///

View File

@ -35,6 +35,7 @@ pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split};
pub use self::leveldb_store::LevelDB;
pub use self::memory_store::MemoryStore;
pub use self::partial_beacon_state::PartialBeaconState;
pub use crate::metadata::BlobInfo;
pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metadata::AnchorInfo;
@ -157,12 +158,15 @@ pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutBlobs(Hash256, Arc<BlobsSidecar<E>>),
PutOrphanedBlobsKey(Hash256),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
DeleteBlobs(Hash256),
DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256),
KeyValueOp(KeyValueStoreOp),
}
/// A unique column identifier.
@ -175,6 +179,9 @@ pub enum DBColumn {
BeaconBlock,
#[strum(serialize = "blb")]
BeaconBlob,
/// Block roots of orphaned beacon blobs.
#[strum(serialize = "blo")]
BeaconBlobOrphan,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
#[strum(serialize = "ste")]
BeaconState,

View File

@ -120,14 +120,10 @@ impl StoreItem for AnchorInfo {
}
/// Database parameters relevant to blob sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct BlobInfo {
/// The block root of the next blob that needs to be added to fill in the history.
pub oldest_blob_parent: Hash256,
/// The slot before which blobs are available.
pub oldest_blob_slot: Slot,
/// The slot from which blobs are available.
pub latest_blob_slot: Slot,
/// The slot after which blobs are available (>=).
pub oldest_blob_slot: Option<Slot>,
}
impl StoreItem for BlobInfo {

View File

@ -768,7 +768,7 @@ where
.ok_or_else(|| Error::InvalidBlock(InvalidBlock::UnknownParent(block.parent_root())))?;
// Blocks cannot be in the future. If they are, their consideration must be delayed until
// the are in the past.
// they are in the past.
//
// Note: presently, we do not delay consideration. We just drop the block.
if block.slot() > current_slot {

View File

@ -76,8 +76,8 @@ impl Slot {
}
impl Epoch {
pub const fn new(slot: u64) -> Epoch {
Epoch(slot)
pub const fn new(epoch: u64) -> Epoch {
Epoch(epoch)
}
pub fn max_value() -> Epoch {

View File

@ -65,6 +65,12 @@ pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> {
.about("Prune finalized execution payloads")
}
pub fn prune_blobs_app<'a, 'b>() -> App<'a, 'b> {
App::new("prune_blobs")
.setting(clap::AppSettings::ColoredHelp)
.about("Prune blobs older than data availability boundary")
}
pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
App::new(CMD)
.visible_aliases(&["db"])
@ -88,10 +94,21 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.help("Data directory for the freezer database.")
.takes_value(true),
)
.arg(
Arg::with_name("blob-prune-margin-epochs")
.long("blob-prune-margin-epochs")
.help(
"The margin for blob pruning in epochs. The oldest blobs are pruned \
up until data_availability_boundary - blob_prune_margin_epochs.",
)
.takes_value(true)
.default_value("0"),
)
.subcommand(migrate_cli_app())
.subcommand(version_cli_app())
.subcommand(inspect_cli_app())
.subcommand(prune_payloads_app())
.subcommand(prune_blobs_app())
}
fn parse_client_config<E: EthSpec>(
@ -110,6 +127,12 @@ fn parse_client_config<E: EthSpec>(
client_config.store.slots_per_restore_point = sprp;
client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit;
if let Some(blob_prune_margin_epochs) =
clap_utils::parse_optional(cli_args, "blob-prune-margin-epochs")?
{
client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs;
}
Ok(client_config)
}
@ -287,6 +310,29 @@ pub fn prune_payloads<E: EthSpec>(
db.try_prune_execution_payloads(force)
}
pub fn prune_blobs<E: EthSpec>(
client_config: ClientConfig,
runtime_context: &RuntimeContext<E>,
log: Logger,
) -> Result<(), Error> {
let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path,
&cold_path,
|_, _, _| Ok(()),
client_config.store,
spec.clone(),
log,
)?;
// If we're triggering a prune manually then ignore the check on `epochs_per_blob_prune` that
// bails out early by passing true to the force parameter.
db.try_prune_most_blobs(true)
}
/// Run the database manager, returning an error string if the operation did not succeed.
pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, env: Environment<T>) -> Result<(), String> {
let client_config = parse_client_config(cli_args, &env)?;
@ -304,6 +350,7 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, env: Environment<T>) -> Result
inspect_db(inspect_config, client_config, &context, log)
}
("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log),
("prune_blobs", Some(_)) => prune_blobs(client_config, &context, log),
_ => {
return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into())
}

View File

@ -1341,6 +1341,45 @@ fn prune_payloads_on_startup_false() {
.with_config(|config| assert!(!config.store.prune_payloads));
}
#[test]
fn prune_blobs_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert!(config.store.prune_blobs));
}
#[test]
fn prune_blobs_on_startup_false() {
CommandLineTest::new()
.flag("prune-blobs", Some("false"))
.run_with_zero_port()
.with_config(|config| assert!(!config.store.prune_blobs));
}
#[test]
fn epochs_per_blob_prune_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert!(config.epochs_per_blob_prune == 1));
}
#[test]
fn epochs_per_blob_prune_on_startup_five() {
CommandLineTest::new()
.flag("epochs-per-blob-prune", Some(5))
.run_with_zero_port()
.with_config(|config| assert!(!config.epochs_per_blob_prune == 5));
}
#[test]
fn blob_prune_margin_epochs_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert!(config.blob_prune_margin_epochs == 0));
}
#[test]
fn blob_prune_margin_epochs_on_startup_ten() {
CommandLineTest::new()
.flag("blob-prune-margin-epochs", Some(10))
.run_with_zero_port()
.with_config(|config| assert!(!config.blob_prune_margin_epochs == Some(10)));
}
#[test]
fn reconstruct_historic_states_flag() {
CommandLineTest::new()
.flag("reconstruct-historic-states", None)