diff --git a/.gitignore b/.gitignore index ae9f83c46..2c656632a 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ genesis.ssz # IntelliJ /*.iml + +# VSCode +/.vscode \ No newline at end of file diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 221d380a8..741d9a95b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -611,10 +611,10 @@ impl BeaconChain { start_slot, end_slot, || { - ( + Ok(( head.beacon_state.clone_with_only_committee_caches(), head.beacon_block_root, - ) + )) }, &self.spec, )?; @@ -708,10 +708,10 @@ impl BeaconChain { start_slot, end_slot, || { - ( + Ok(( head.beacon_state.clone_with_only_committee_caches(), head.beacon_state_root(), - ) + )) }, &self.spec, )?; @@ -2878,7 +2878,7 @@ impl BeaconChain { // is so we don't have to think about lock ordering with respect to the fork choice lock. // There are a bunch of places where we lock both fork choice and the pubkey cache and it // would be difficult to check that they all lock fork choice first. - let mut kv_store_ops = self + let mut ops = self .validator_pubkey_cache .try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) .ok_or(Error::ValidatorPubkeyCacheLockTimeout)? @@ -2981,9 +2981,14 @@ impl BeaconChain { // ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- // Most blocks are now capable of being attested to thanks to the `early_attester_cache` // cache above. Resume non-essential processing. + // + // It is important NOT to return errors here before the database commit, because the block + // has already been added to fork choice and the database would be left in an inconsistent + // state if we returned early without committing. In other words, an error here would + // corrupt the node's database permanently. // ----------------------------------------------------------------------------------------- - self.import_block_update_shuffling_cache(block_root, &mut state)?; + self.import_block_update_shuffling_cache(block_root, &mut state); self.import_block_observe_attestations( block, &state, @@ -3008,25 +3013,39 @@ impl BeaconChain { // See https://github.com/sigp/lighthouse/issues/2028 let (signed_block, blobs) = signed_block.deconstruct(); let block = signed_block.message(); - let mut ops: Vec<_> = confirmed_state_roots - .into_iter() - .map(StoreOp::DeleteStateTemporaryFlag) - .collect(); + ops.extend( + confirmed_state_roots + .into_iter() + .map(StoreOp::DeleteStateTemporaryFlag), + ); ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); - if let Some(blobs) = blobs { - if blobs.blobs.len() > 0 { - //FIXME(sean) using this for debugging for now - info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); - ops.push(StoreOp::PutBlobs(block_root, blobs)); + // Only consider blobs if the eip4844 fork is enabled. + if let Some(data_availability_boundary) = self.data_availability_boundary() { + let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let margin_epochs = self.store.get_config().blob_prune_margin_epochs; + let import_boundary = data_availability_boundary - margin_epochs; + + // Only store blobs at the data availability boundary, minus any configured epochs + // margin, or younger (of higher epoch number). + if block_epoch >= import_boundary { + if let Some(blobs) = blobs { + if blobs.blobs.len() > 0 { + //FIXME(sean) using this for debugging for now + info!( + self.log, "Writing blobs to store"; + "block_root" => ?block_root + ); + ops.push(StoreOp::PutBlobs(block_root, blobs)); + } + } } - }; + } + let txn_lock = self.store.hot_db.begin_rw_transaction(); - kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?); - - if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) { + if let Err(e) = self.store.do_atomically(ops) { error!( self.log, "Database write failed!"; @@ -3455,13 +3474,27 @@ impl BeaconChain { } } + // For the current and next epoch of this state, ensure we have the shuffling from this + // block in our cache. fn import_block_update_shuffling_cache( &self, block_root: Hash256, state: &mut BeaconState, + ) { + if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) { + warn!( + self.log, + "Failed to prime shuffling cache"; + "error" => ?e + ); + } + } + + fn import_block_update_shuffling_cache_fallible( + &self, + block_root: Hash256, + state: &mut BeaconState, ) -> Result<(), BlockError> { - // For the current and next epoch of this state, ensure we have the shuffling from this - // block in our cache. for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 164ed8a93..0fd016e0b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -914,6 +914,11 @@ where ); } + // Prune blobs sidecars older than the blob data availability boundary in the background. + beacon_chain + .store_migrator + .process_prune_blobs(beacon_chain.data_availability_boundary()); + Ok(beacon_chain) } } diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 19eddf602..0d14e3819 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -751,6 +751,10 @@ impl BeaconChain { // Drop the old cache head nice and early to try and free the memory as soon as possible. drop(old_cached_head); + // Prune blobs in the background. + self.store_migrator + .process_prune_blobs(self.data_availability_boundary()); + // If the finalized checkpoint changed, perform some updates. // // The `after_finalization` function will take a write-lock on `fork_choice`, therefore it diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 66f082742..0690d0767 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -86,6 +86,7 @@ pub enum PruningError { pub enum Notification { Finalization(FinalizationNotification), Reconstruction, + PruneBlobs(Option), } pub struct FinalizationNotification { @@ -152,6 +153,14 @@ impl, Cold: ItemStore> BackgroundMigrator) { + if let Some(Notification::PruneBlobs(data_availability_boundary)) = + self.send_background_notification(Notification::PruneBlobs(data_availability_boundary)) + { + Self::run_prune_blobs(self.db.clone(), data_availability_boundary, &self.log); + } + } + pub fn run_reconstruction(db: Arc>, log: &Logger) { if let Err(e) = db.reconstruct_historic_states() { error!( @@ -162,6 +171,20 @@ impl, Cold: ItemStore> BackgroundMigrator>, + data_availability_boundary: Option, + log: &Logger, + ) { + if let Err(e) = db.try_prune_blobs(false, data_availability_boundary) { + error!( + log, + "Blobs pruning failed"; + "error" => ?e, + ); + } + } + /// If configured to run in the background, send `notif` to the background thread. /// /// Return `None` if the message was sent to the background thread, `Some(notif)` otherwise. @@ -320,11 +343,21 @@ impl, Cold: ItemStore> BackgroundMigrator best, + (Notification::PruneBlobs(_), Notification::Finalization(_)) => other, + (Notification::PruneBlobs(dab1), Notification::PruneBlobs(dab2)) => { + if dab2 > dab1 { + other + } else { + best + } + } }); match notif { Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log), Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log), + Notification::PruneBlobs(dab) => Self::run_prune_blobs(db.clone(), dab, &log), } } }); @@ -569,10 +602,18 @@ impl, Cold: ItemStore> BackgroundMigrator validator_publickey`. @@ -38,7 +38,7 @@ impl ValidatorPubkeyCache { }; let store_ops = cache.import_new_pubkeys(state)?; - store.hot_db.do_atomically(store_ops)?; + store.do_atomically(store_ops)?; Ok(cache) } @@ -79,7 +79,7 @@ impl ValidatorPubkeyCache { pub fn import_new_pubkeys( &mut self, state: &BeaconState, - ) -> Result, BeaconChainError> { + ) -> Result>, BeaconChainError> { if state.validators().len() > self.pubkeys.len() { self.import( state.validators()[self.pubkeys.len()..] @@ -92,7 +92,10 @@ impl ValidatorPubkeyCache { } /// Adds zero or more validators to `self`. - fn import(&mut self, validator_keys: I) -> Result, BeaconChainError> + fn import( + &mut self, + validator_keys: I, + ) -> Result>, BeaconChainError> where I: Iterator + ExactSizeIterator, { @@ -112,7 +115,9 @@ impl ValidatorPubkeyCache { // It will be committed atomically when the block that introduced it is written to disk. // Notably it is NOT written while the write lock on the cache is held. // See: https://github.com/sigp/lighthouse/issues/2327 - store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i))); + store_ops.push(StoreOp::KeyValueOp( + DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)), + )); self.pubkeys.push( (&pubkey) @@ -294,7 +299,7 @@ mod test { let ops = cache .import_new_pubkeys(&state) .expect("should import pubkeys"); - store.hot_db.do_atomically(ops).unwrap(); + store.do_atomically(ops).unwrap(); check_cache_get(&cache, &keypairs[..]); drop(cache); diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 0bd4eebcc..01b7cb43b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -688,12 +688,10 @@ impl Worker { let serve_blobs_from_slot = if start_epoch < data_availability_boundary { // Attempt to serve from the earliest block in our database, falling back to the data // availability boundary - let oldest_blob_slot = self - .chain - .store - .get_blob_info() - .map(|blob_info| blob_info.oldest_blob_slot) - .unwrap_or(data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch())); + let oldest_blob_slot = + self.chain.store.get_blob_info().oldest_blob_slot.unwrap_or( + data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()), + ); debug!( self.log, diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index c5aef78aa..e711dfca9 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -551,6 +551,31 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("true") ) + .arg( + Arg::with_name("prune-blobs") + .long("prune-blobs") + .help("Prune blobs from Lighthouse's database when they are older than the data \ + data availability boundary relative to the current epoch.") + .takes_value(true) + .default_value("true") + ) + .arg( + Arg::with_name("epochs-per-blob-prune") + .long("epochs-per-blob-prune") + .help("The epoch interval with which to prune blobs from Lighthouse's \ + database when they are older than the data availability boundary \ + relative to the current epoch.") + .takes_value(true) + .default_value("1") + ) + .arg( + Arg::with_name("blob-prune-margin-epochs") + .long("blob-prune-margin-epochs") + .help("The margin for blob pruning in epochs. The oldest blobs are pruned \ + up until data_availability_boundary - blob_prune_margin_epochs.") + .takes_value(true) + .default_value("0") + ) /* * Misc. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 1ce0f5f77..7ced91274 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -411,6 +411,22 @@ pub fn get_config( client_config.store.prune_payloads = prune_payloads; } + if let Some(prune_blobs) = clap_utils::parse_optional(cli_args, "prune-blobs")? { + client_config.store.prune_blobs = prune_blobs; + } + + if let Some(epochs_per_blob_prune) = + clap_utils::parse_optional(cli_args, "epochs-per-blob-prune")? + { + client_config.store.epochs_per_blob_prune = epochs_per_blob_prune; + } + + if let Some(blob_prune_margin_epochs) = + clap_utils::parse_optional(cli_args, "blob-prune-margin-epochs")? + { + client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs; + } + /* * Zero-ports * diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 53d99f75e..ec5ee382b 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -8,6 +8,8 @@ pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192; pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5; pub const DEFAULT_BLOB_CACHE_SIZE: usize = 5; +pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1; +pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0; /// Database configuration parameters. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -26,6 +28,13 @@ pub struct StoreConfig { pub compact_on_prune: bool, /// Whether to prune payloads on initialization and finalization. pub prune_payloads: bool, + /// Whether to prune blobs older than the blob data availability boundary. + pub prune_blobs: bool, + /// Frequency of blob pruning in epochs. Default: 1 (every epoch). + pub epochs_per_blob_prune: u64, + /// The margin for blob pruning in epochs. The oldest blobs are pruned up until + /// data_availability_boundary - blob_prune_margin_epochs. Default: 0. + pub blob_prune_margin_epochs: u64, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -50,6 +59,9 @@ impl Default for StoreConfig { compact_on_init: false, compact_on_prune: true, prune_payloads: true, + prune_blobs: true, + epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE, + blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS, } } } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 8a0b44197..ac50cc6aa 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -19,6 +19,8 @@ pub enum Error { }, RlpError(String), BlockNotFound(Hash256), + /// The blobs sidecar mapping to this block root is older than the data availability boundary. + BlobsTooOld(Hash256, Slot), NoContinuationData, SplitPointModified(Slot, Slot), ConfigError(StoreConfigError), diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index 353be6bf0..a78b2b469 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -150,7 +150,7 @@ impl<'a, E: EthSpec, F: Root, Hot: ItemStore, Cold: ItemStore> store: &'a HotColdDB, start_slot: Slot, end_slot: Option, - get_state: impl FnOnce() -> (BeaconState, Hash256), + get_state: impl FnOnce() -> Result<(BeaconState, Hash256)>, spec: &ChainSpec, ) -> Result { use HybridForwardsIterator::*; @@ -172,7 +172,7 @@ impl<'a, E: EthSpec, F: Root, Hot: ItemStore, Cold: ItemStore> if end_slot.map_or(false, |end_slot| end_slot < latest_restore_point_slot) { None } else { - Some(Box::new(get_state())) + Some(Box::new(get_state()?)) }; PreFinalization { iter, @@ -180,7 +180,7 @@ impl<'a, E: EthSpec, F: Root, Hot: ItemStore, Cold: ItemStore> } } else { PostFinalizationLazy { - continuation_data: Some(Box::new(get_state())), + continuation_data: Some(Box::new(get_state()?)), store, start_slot, } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 82a65883a..99b516ee9 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -38,6 +38,7 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use std::time::Duration; +use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::*; /// On-disk database that stores finalized states efficiently. @@ -54,7 +55,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// The starting slots for the range of blocks & states stored in the database. anchor_info: RwLock>, /// The starting slots for the range of blobs stored in the database. - blob_info: RwLock>, + blob_info: RwLock, pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, @@ -108,6 +109,7 @@ pub enum HotColdDBError { slots_per_historical_root: u64, slots_per_epoch: u64, }, + ZeroEpochsPerBlobPrune, RestorePointBlockHashError(BeaconStateError), IterationError { unexpected_key: BytesKey, @@ -125,12 +127,12 @@ impl HotColdDB, MemoryStore> { spec: ChainSpec, log: Logger, ) -> Result, MemoryStore>, Error> { - Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + Self::verify_config(&config)?; let db = HotColdDB { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), - blob_info: RwLock::new(None), + blob_info: RwLock::new(BlobInfo::default()), cold_db: MemoryStore::open(), hot_db: MemoryStore::open(), block_cache: Mutex::new(LruCache::new(config.block_cache_size)), @@ -165,7 +167,7 @@ impl HotColdDB, LevelDB> { let mut db = HotColdDB { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), - blob_info: RwLock::new(None), + blob_info: RwLock::new(BlobInfo::default()), cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), @@ -226,6 +228,17 @@ impl HotColdDB, LevelDB> { db.store_schema_version(CURRENT_SCHEMA_VERSION)?; } + if let Some(blob_info) = db.load_blob_info()? { + let oldest_blob_slot = blob_info.oldest_blob_slot; + *db.blob_info.write() = blob_info; + + info!( + db.log, + "Blob info loaded from disk"; + "oldest_blob_slot" => ?oldest_blob_slot, + ); + } + // Ensure that any on-disk config is compatible with the supplied config. if let Some(disk_config) = db.load_config()? { db.config.check_compatibility(&disk_config)?; @@ -477,6 +490,12 @@ impl, Cold: ItemStore> HotColdDB .map(|payload| payload.is_some()) } + /// Check if the blobs sidecar for a block exists on disk. + pub fn blobs_sidecar_exists(&self, block_root: &Hash256) -> Result { + self.get_item::>(block_root) + .map(|blobs| blobs.is_some()) + } + /// Determine whether a block exists in the database. pub fn block_exists(&self, block_root: &Hash256) -> Result { self.hot_db @@ -646,7 +665,7 @@ impl, Cold: ItemStore> HotColdDB self, start_slot, None, - || (end_state, end_block_root), + || Ok((end_state, end_block_root)), spec, ) } @@ -655,7 +674,7 @@ impl, Cold: ItemStore> HotColdDB &self, start_slot: Slot, end_slot: Slot, - get_state: impl FnOnce() -> (BeaconState, Hash256), + get_state: impl FnOnce() -> Result<(BeaconState, Hash256), Error>, spec: &ChainSpec, ) -> Result, Error> { HybridForwardsBlockRootsIterator::new(self, start_slot, Some(end_slot), get_state, spec) @@ -672,7 +691,7 @@ impl, Cold: ItemStore> HotColdDB self, start_slot, None, - || (end_state, end_state_root), + || Ok((end_state, end_state_root)), spec, ) } @@ -681,7 +700,7 @@ impl, Cold: ItemStore> HotColdDB &self, start_slot: Slot, end_slot: Slot, - get_state: impl FnOnce() -> (BeaconState, Hash256), + get_state: impl FnOnce() -> Result<(BeaconState, Hash256), Error>, spec: &ChainSpec, ) -> Result, Error> { HybridForwardsStateRootsIterator::new(self, start_slot, Some(end_slot), get_state, spec) @@ -777,6 +796,11 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } + StoreOp::DeleteBlobs(block_root) => { + let key = get_key_for_col(DBColumn::BeaconBlob.into(), block_root.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } + StoreOp::DeleteState(state_root, slot) => { let state_summary_key = get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); @@ -793,6 +817,16 @@ impl, Cold: ItemStore> HotColdDB let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } + + StoreOp::PutOrphanedBlobsKey(block_root) => { + let db_key = + get_key_for_col(DBColumn::BeaconBlobOrphan.into(), block_root.as_bytes()); + key_value_batch.push(KeyValueStoreOp::PutKeyValue(db_key, [].into())); + } + + StoreOp::KeyValueOp(kv_op) => { + key_value_batch.push(kv_op); + } } } Ok(key_value_batch) @@ -826,15 +860,24 @@ impl, Cold: ItemStore> HotColdDB guard.pop(block_root); } + StoreOp::DeleteBlobs(block_root) => { + guard_blob.pop(block_root); + } + StoreOp::DeleteState(_, _) => (), StoreOp::DeleteExecutionPayload(_) => (), + + StoreOp::PutOrphanedBlobsKey(_) => (), + + StoreOp::KeyValueOp(_) => (), } } self.hot_db .do_atomically(self.convert_to_kv_batch(batch)?)?; drop(guard); + drop(guard_blob); Ok(()) } @@ -1046,7 +1089,7 @@ impl, Cold: ItemStore> HotColdDB let state_root_iter = self.forwards_state_roots_iterator_until( low_restore_point.slot(), slot, - || (high_restore_point, Hash256::zero()), + || Ok((high_restore_point, Hash256::zero())), &self.spec, )?; @@ -1311,7 +1354,7 @@ impl, Cold: ItemStore> HotColdDB /// Get a clone of the store's blob info. /// /// To do mutations, use `compare_and_set_blob_info`. - pub fn get_blob_info(&self) -> Option { + pub fn get_blob_info(&self) -> BlobInfo { self.blob_info.read_recursive().clone() } @@ -1322,10 +1365,10 @@ impl, Cold: ItemStore> HotColdDB /// /// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided /// is not correct. - pub fn compare_and_set_blob_info( + fn compare_and_set_blob_info( &self, - prev_value: Option, - new_value: Option, + prev_value: BlobInfo, + new_value: BlobInfo, ) -> Result { let mut blob_info = self.blob_info.write(); if *blob_info == prev_value { @@ -1333,15 +1376,15 @@ impl, Cold: ItemStore> HotColdDB *blob_info = new_value; Ok(kv_op) } else { - Err(Error::AnchorInfoConcurrentMutation) + Err(Error::BlobInfoConcurrentMutation) } } /// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately. - pub fn compare_and_set_blob_info_with_write( + fn compare_and_set_blob_info_with_write( &self, - prev_value: Option, - new_value: Option, + prev_value: BlobInfo, + new_value: BlobInfo, ) -> Result<(), Error> { let kv_store_op = self.compare_and_set_blob_info(prev_value, new_value)?; self.hot_db.do_atomically(vec![kv_store_op]) @@ -1356,15 +1399,8 @@ impl, Cold: ItemStore> HotColdDB /// /// The argument is intended to be `self.blob_info`, but is passed manually to avoid issues /// with recursive locking. - fn store_blob_info_in_batch(&self, blob_info: &Option) -> KeyValueStoreOp { - if let Some(ref blob_info) = blob_info { - blob_info.as_kv_store_op(BLOB_INFO_KEY) - } else { - KeyValueStoreOp::DeleteKey(get_key_for_col( - DBColumn::BeaconMeta.into(), - BLOB_INFO_KEY.as_bytes(), - )) - } + fn store_blob_info_in_batch(&self, blob_info: &BlobInfo) -> KeyValueStoreOp { + blob_info.as_kv_store_op(BLOB_INFO_KEY) } /// Return the slot-window describing the available historic states. @@ -1487,6 +1523,12 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.get(state_root) } + /// Verify that a parsed config. + fn verify_config(config: &StoreConfig) -> Result<(), HotColdDBError> { + Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + Self::verify_epochs_per_blob_prune(config.epochs_per_blob_prune) + } + /// Check that the restore point frequency is valid. /// /// Specifically, check that it is: @@ -1517,6 +1559,16 @@ impl, Cold: ItemStore> HotColdDB } } + // Check that epochs_per_blob_prune is at least 1 epoch to avoid attempting to prune the same + // epochs over and over again. + fn verify_epochs_per_blob_prune(epochs_per_blob_prune: u64) -> Result<(), HotColdDBError> { + if epochs_per_blob_prune > 0 { + Ok(()) + } else { + Err(HotColdDBError::ZeroEpochsPerBlobPrune) + } + } + /// Run a compaction pass to free up space used by deleted states. pub fn compact(&self) -> Result<(), Error> { self.hot_db.compact()?; @@ -1669,6 +1721,166 @@ impl, Cold: ItemStore> HotColdDB ); Ok(()) } + + /// Try to prune blobs, approximating the current epoch from lower epoch numbers end (older + /// end) and is useful when the data availability boundary is not at hand. + pub fn try_prune_most_blobs(&self, force: bool) -> Result<(), Error> { + let eip4844_fork = match self.spec.eip4844_fork_epoch { + Some(epoch) => epoch, + None => { + debug!(self.log, "Eip4844 fork is disabled"); + return Ok(()); + } + }; + // At best, current_epoch = split_epoch + 2. However, if finalization doesn't advance, the + // `split.slot` is not updated and current_epoch > split_epoch + 2. + let min_current_epoch = self.get_split_slot().epoch(E::slots_per_epoch()) + Epoch::new(2); + let min_data_availability_boundary = std::cmp::max( + eip4844_fork, + min_current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), + ); + + self.try_prune_blobs(force, Some(min_data_availability_boundary)) + } + + /// Try to prune blobs older than the data availability boundary. + pub fn try_prune_blobs( + &self, + force: bool, + data_availability_boundary: Option, + ) -> Result<(), Error> { + let (data_availability_boundary, eip4844_fork) = + match (data_availability_boundary, self.spec.eip4844_fork_epoch) { + (Some(boundary_epoch), Some(fork_epoch)) => (boundary_epoch, fork_epoch), + _ => { + debug!(self.log, "Eip4844 fork is disabled"); + return Ok(()); + } + }; + + let should_prune_blobs = self.get_config().prune_blobs; + if !should_prune_blobs && !force { + debug!( + self.log, + "Blob pruning is disabled"; + "prune_blobs" => should_prune_blobs + ); + return Ok(()); + } + + let blob_info = self.get_blob_info(); + let oldest_blob_slot = blob_info + .oldest_blob_slot + .unwrap_or(eip4844_fork.start_slot(E::slots_per_epoch())); + + // The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the + // middle of an epoch otherwise the oldest blob slot is a start slot. + let last_pruned_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()) - 1; + + // At most prune blobs up until the data availability boundary epoch, leaving at least + // blobs of the data availability boundary epoch and younger. + let earliest_prunable_epoch = data_availability_boundary - 1; + // Stop pruning before reaching the data availability boundary if a margin is configured. + let margin_epochs = self.get_config().blob_prune_margin_epochs; + let end_epoch = earliest_prunable_epoch - margin_epochs; + + if !force { + if last_pruned_epoch.as_u64() + self.get_config().epochs_per_blob_prune + > end_epoch.as_u64() + { + info!(self.log, "Blobs sidecars are pruned"); + return Ok(()); + } + } + + // Iterate block roots forwards from the oldest blob slot. + debug!( + self.log, + "Pruning blobs sidecars stored longer than data availability boundary"; + ); + // todo(emhane): If we notice degraded I/O for users switching modes (prune_blobs=true to + // prune_blobs=false) we could add a warning that only fires on a threshold, e.g. more + // than 2x epochs_per_blob_prune epochs without a prune. + + let mut ops = vec![]; + let mut last_pruned_block_root = None; + let end_slot = end_epoch.end_slot(E::slots_per_epoch()); + + for res in self.forwards_block_roots_iterator_until( + oldest_blob_slot, + end_slot, + || { + // todo(emhane): In the future, if the data availability boundary is more recent + // than the split (finalized) epoch, this code will have to change to decide what + // to do with pruned blobs in our not-yet-finalized canonical chain and + // not-yet-orphaned forks (see DBColumn::BeaconBlobOrphan). + // + // Related to review and the spec PRs linked in it: + // https://github.com/sigp/lighthouse/pull/3852#pullrequestreview-1244785136 + let split = self.get_split_info(); + + let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( + HotColdDBError::MissingSplitState(split.state_root, split.slot), + )?; + let split_block_root = split_state.get_latest_block_root(split.state_root); + + Ok((split_state, split_block_root)) + }, + &self.spec, + )? { + let (block_root, slot) = match res { + Ok(tuple) => tuple, + Err(e) => { + warn!( + self.log, + "Stopping blobs sidecar pruning early"; + "error" => ?e, + ); + break; + } + }; + + if Some(block_root) != last_pruned_block_root + && self.blobs_sidecar_exists(&block_root)? + { + debug!( + self.log, + "Pruning blobs sidecar"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteBlobs(block_root)); + } + + if slot >= end_slot { + info!( + self.log, + "Blobs sidecar pruning reached earliest available blobs sidecar"; + "slot" => slot + ); + break; + } + } + let blobs_sidecars_pruned = ops.len(); + + let update_blob_info = self.compare_and_set_blob_info( + blob_info, + BlobInfo { + oldest_blob_slot: Some(end_slot + 1), + }, + )?; + ops.push(StoreOp::KeyValueOp(update_blob_info)); + + self.do_atomically(ops)?; + info!( + self.log, + "Blobs sidecar pruning complete"; + "blobs_sidecars_pruned" => blobs_sidecars_pruned, + ); + + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer. @@ -1710,6 +1922,7 @@ pub fn migrate_database, Cold: ItemStore>( // 1. Copy all of the states between the head and the split slot, from the hot DB // to the cold DB. Delete the execution payloads of these now-finalized blocks. let state_root_iter = RootsIterator::new(&store, frozen_head); + for maybe_tuple in state_root_iter.take_while(|result| match result { Ok((_, _, slot)) => { slot >= ¤t_split_slot @@ -1751,7 +1964,7 @@ pub fn migrate_database, Cold: ItemStore>( } // Warning: Critical section. We have to take care not to put any of the two databases in an - // inconsistent state if the OS process dies at any point during the freezeing + // inconsistent state if the OS process dies at any point during the freezing // procedure. // // Since it is pretty much impossible to be atomic across more than one database, we trade @@ -1767,7 +1980,7 @@ pub fn migrate_database, Cold: ItemStore>( let mut split_guard = store.split.write(); let latest_split_slot = split_guard.slot; - // Detect a sitation where the split point is (erroneously) changed from more than one + // Detect a situation where the split point is (erroneously) changed from more than one // place in code. if latest_split_slot != current_split_slot { error!( @@ -1811,7 +2024,7 @@ pub fn migrate_database, Cold: ItemStore>( } /// Struct for storing the split slot and state root in the database. -#[derive(Debug, Clone, Copy, PartialEq, Default, Encode, Decode, Deserialize, Serialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Encode, Decode, Deserialize, Serialize)] pub struct Split { pub(crate) slot: Slot, pub(crate) state_root: Hash256, diff --git a/beacon_node/store/src/impls/execution_payload.rs b/beacon_node/store/src/impls/execution_payload.rs index ad68d1fba..01a2dba0b 100644 --- a/beacon_node/store/src/impls/execution_payload.rs +++ b/beacon_node/store/src/impls/execution_payload.rs @@ -1,7 +1,7 @@ use crate::{DBColumn, Error, StoreItem}; use ssz::{Decode, Encode}; use types::{ - EthSpec, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, + BlobsSidecar, EthSpec, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, }; @@ -25,6 +25,7 @@ macro_rules! impl_store_item { impl_store_item!(ExecutionPayloadMerge); impl_store_item!(ExecutionPayloadCapella); impl_store_item!(ExecutionPayloadEip4844); +impl_store_item!(BlobsSidecar); /// This fork-agnostic implementation should be only used for writing. /// diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index dfdeab941..1d7e92b80 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -35,6 +35,7 @@ pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split}; pub use self::leveldb_store::LevelDB; pub use self::memory_store::MemoryStore; pub use self::partial_beacon_state::PartialBeaconState; +pub use crate::metadata::BlobInfo; pub use errors::Error; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metadata::AnchorInfo; @@ -157,12 +158,15 @@ pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), PutBlobs(Hash256, Arc>), + PutOrphanedBlobsKey(Hash256), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), + DeleteBlobs(Hash256), DeleteState(Hash256, Option), DeleteExecutionPayload(Hash256), + KeyValueOp(KeyValueStoreOp), } /// A unique column identifier. @@ -175,6 +179,9 @@ pub enum DBColumn { BeaconBlock, #[strum(serialize = "blb")] BeaconBlob, + /// Block roots of orphaned beacon blobs. + #[strum(serialize = "blo")] + BeaconBlobOrphan, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). #[strum(serialize = "ste")] BeaconState, diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 15bcaf1bb..b5de0048f 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -120,14 +120,10 @@ impl StoreItem for AnchorInfo { } /// Database parameters relevant to blob sync. -#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] pub struct BlobInfo { - /// The block root of the next blob that needs to be added to fill in the history. - pub oldest_blob_parent: Hash256, - /// The slot before which blobs are available. - pub oldest_blob_slot: Slot, - /// The slot from which blobs are available. - pub latest_blob_slot: Slot, + /// The slot after which blobs are available (>=). + pub oldest_blob_slot: Option, } impl StoreItem for BlobInfo { diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index f795b0790..f614007ae 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -768,7 +768,7 @@ where .ok_or_else(|| Error::InvalidBlock(InvalidBlock::UnknownParent(block.parent_root())))?; // Blocks cannot be in the future. If they are, their consideration must be delayed until - // the are in the past. + // they are in the past. // // Note: presently, we do not delay consideration. We just drop the block. if block.slot() > current_slot { diff --git a/consensus/types/src/slot_epoch.rs b/consensus/types/src/slot_epoch.rs index 2716367c7..06f99b988 100644 --- a/consensus/types/src/slot_epoch.rs +++ b/consensus/types/src/slot_epoch.rs @@ -76,8 +76,8 @@ impl Slot { } impl Epoch { - pub const fn new(slot: u64) -> Epoch { - Epoch(slot) + pub const fn new(epoch: u64) -> Epoch { + Epoch(epoch) } pub fn max_value() -> Epoch { diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 33accfc05..a33e6c149 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -65,6 +65,12 @@ pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { .about("Prune finalized execution payloads") } +pub fn prune_blobs_app<'a, 'b>() -> App<'a, 'b> { + App::new("prune_blobs") + .setting(clap::AppSettings::ColoredHelp) + .about("Prune blobs older than data availability boundary") +} + pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new(CMD) .visible_aliases(&["db"]) @@ -88,10 +94,21 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("Data directory for the freezer database.") .takes_value(true), ) + .arg( + Arg::with_name("blob-prune-margin-epochs") + .long("blob-prune-margin-epochs") + .help( + "The margin for blob pruning in epochs. The oldest blobs are pruned \ + up until data_availability_boundary - blob_prune_margin_epochs.", + ) + .takes_value(true) + .default_value("0"), + ) .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) .subcommand(prune_payloads_app()) + .subcommand(prune_blobs_app()) } fn parse_client_config( @@ -110,6 +127,12 @@ fn parse_client_config( client_config.store.slots_per_restore_point = sprp; client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit; + if let Some(blob_prune_margin_epochs) = + clap_utils::parse_optional(cli_args, "blob-prune-margin-epochs")? + { + client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs; + } + Ok(client_config) } @@ -287,6 +310,29 @@ pub fn prune_payloads( db.try_prune_execution_payloads(force) } +pub fn prune_blobs( + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log, + )?; + + // If we're triggering a prune manually then ignore the check on `epochs_per_blob_prune` that + // bails out early by passing true to the force parameter. + db.try_prune_most_blobs(true) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result<(), String> { let client_config = parse_client_config(cli_args, &env)?; @@ -304,6 +350,7 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result inspect_db(inspect_config, client_config, &context, log) } ("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log), + ("prune_blobs", Some(_)) => prune_blobs(client_config, &context, log), _ => { return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 7e581ee61..237ca2db5 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1341,6 +1341,45 @@ fn prune_payloads_on_startup_false() { .with_config(|config| assert!(!config.store.prune_payloads)); } #[test] +fn prune_blobs_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.store.prune_blobs)); +} +#[test] +fn prune_blobs_on_startup_false() { + CommandLineTest::new() + .flag("prune-blobs", Some("false")) + .run_with_zero_port() + .with_config(|config| assert!(!config.store.prune_blobs)); +} +#[test] +fn epochs_per_blob_prune_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.epochs_per_blob_prune == 1)); +} +#[test] +fn epochs_per_blob_prune_on_startup_five() { + CommandLineTest::new() + .flag("epochs-per-blob-prune", Some(5)) + .run_with_zero_port() + .with_config(|config| assert!(!config.epochs_per_blob_prune == 5)); +} +#[test] +fn blob_prune_margin_epochs_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.blob_prune_margin_epochs == 0)); +} +#[test] +fn blob_prune_margin_epochs_on_startup_ten() { + CommandLineTest::new() + .flag("blob-prune-margin-epochs", Some(10)) + .run_with_zero_port() + .with_config(|config| assert!(!config.blob_prune_margin_epochs == Some(10))); +} +#[test] fn reconstruct_historic_states_flag() { CommandLineTest::new() .flag("reconstruct-historic-states", None)