diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index fe62f8094..f4e6fc91d 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -918,9 +918,13 @@ where if beacon_chain.store.get_config().prune_blobs { let store = beacon_chain.store.clone(); let log = log.clone(); + let current_slot = beacon_chain + .slot() + .map_err(|e| format!("Failed to get current slot: {:?}", e))?; + let current_epoch = current_slot.epoch(TEthSpec::slots_per_epoch()); beacon_chain.task_executor.spawn_blocking( move || { - if let Err(e) = store.try_prune_blobs(false) { + if let Err(e) = store.try_prune_blobs(false, Some(current_epoch)) { error!(log, "Error pruning blobs in background"; "error" => ?e); } }, diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 1bd998261..63cb143d3 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -54,9 +54,8 @@ use slog::{crit, debug, error, warn, Logger}; use slot_clock::SlotClock; use std::sync::Arc; use std::time::Duration; -use store::{iter::StateRootsIterator, KeyValueStoreOp, Split, StoreItem}; +use store::{iter::StateRootsIterator, KeyValueStoreOp, StoreItem}; use task_executor::{JoinHandle, ShutdownReason}; -use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::*; /// Simple wrapper around `RwLock` that uses private visibility to prevent any other modules from @@ -794,51 +793,6 @@ impl BeaconChain { .execution_status .is_optimistic_or_invalid(); - if self.store.get_config().prune_blobs { - let current_slot = self.slot()?; - let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); - if Some(current_epoch) - > self.spec.eip4844_fork_epoch.map(|eip4844_fork_epoch| { - eip4844_fork_epoch + *MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS - }) - { - let current_epoch_start_slot = - current_epoch.start_slot(T::EthSpec::slots_per_epoch()); - - // Update db's metadata for blobs pruning. - if current_slot == current_epoch_start_slot { - if let Some(mut blob_info) = self.store.get_blob_info() { - if let Some(data_availability_boundary) = self.data_availability_boundary() - { - let dab_slot = - data_availability_boundary.end_slot(T::EthSpec::slots_per_epoch()); - if let Some(dab_state_root) = self.state_root_at_slot(dab_slot)? { - blob_info.data_availability_boundary = - Split::new(dab_slot, dab_state_root); - - self.store.compare_and_set_blob_info_with_write( - self.store.get_blob_info(), - Some(blob_info), - )?; - } - } - } - } - } - - let store = self.store.clone(); - let log = self.log.clone(); - - self.task_executor.spawn_blocking( - move || { - if let Err(e) = store.try_prune_blobs(false) { - error!(log, "Error pruning blobs in background"; "error" => ?e); - } - }, - "prune_blobs_background", - ); - } - // Detect and potentially report any re-orgs. let reorg_distance = detect_reorg( &old_snapshot.beacon_state, @@ -1060,6 +1014,22 @@ impl BeaconChain { // Take a write-lock on the canonical head and signal for it to prune. self.canonical_head.fork_choice_write_lock().prune()?; + // Prune blobs. + if self.store.get_config().prune_blobs { + let store = self.store.clone(); + let log = self.log.clone(); + let current_slot = self.slot()?; + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + self.task_executor.spawn_blocking( + move || { + if let Err(e) = store.try_prune_blobs(false, Some(current_epoch)) { + error!(log, "Error pruning blobs in background"; "error" => ?e); + } + }, + "prune_blobs_background", + ); + } + Ok(()) } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e53499438..1982ab2e8 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -38,6 +38,7 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use std::time::Duration; +use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::*; /// On-disk database that stores finalized states efficiently. @@ -80,6 +81,7 @@ pub enum HotColdDBError { target_version: SchemaVersion, current_version: SchemaVersion, }, + UnsupportedDataAvailabilityBoundary, /// Recoverable error indicating that the database freeze point couldn't be updated /// due to the finalized block not lying on an epoch boundary (should be infrequent). FreezeSlotUnaligned(Slot), @@ -94,7 +96,6 @@ pub enum HotColdDBError { MissingHotStateSummary(Hash256), MissingEpochBoundaryState(Hash256), MissingSplitState(Hash256, Slot), - MissingStateToPruneBlobs(Hash256), MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, @@ -217,15 +218,13 @@ impl HotColdDB, LevelDB> { } if let Some(blob_info) = db.load_blob_info()? { - let dab_slot = blob_info.data_availability_boundary.slot; - let dab_state_root = blob_info.data_availability_boundary.state_root; + let oldest_blob_slot = blob_info.oldest_blob_slot; *db.blob_info.write() = Some(blob_info); info!( db.log, "Blob info loaded from disk"; - "data_availability_boundary_slot" => dab_slot, - "data_availability_boundary_state" => ?dab_state_root, + "oldest_blob_slot" => oldest_blob_slot, ); } @@ -1375,7 +1374,7 @@ impl, Cold: ItemStore> HotColdDB *blob_info = new_value; Ok(kv_op) } else { - Err(Error::AnchorInfoConcurrentMutation) + Err(Error::BlobInfoConcurrentMutation) } } @@ -1713,25 +1712,42 @@ impl, Cold: ItemStore> HotColdDB } /// Try to prune blobs older than the data availability boundary. - pub fn try_prune_blobs(&self, force: bool) -> Result<(), Error> { + pub fn try_prune_blobs( + &self, + force: bool, + data_availability_boundary: Option, + ) -> Result<(), Error> { let blob_info = match self.get_blob_info() { - Some(old_blob_info) => old_blob_info, + Some(blob_info) => blob_info, None => { return Ok(()); } }; + let oldest_blob_slot = blob_info.oldest_blob_slot; + // The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the + // middle of an epoch. + let last_pruned_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()) - 1; + let next_epoch_to_prune = match data_availability_boundary { + Some(epoch) => epoch, + None => { + // The split slot is set upon finalization and is the first slot in the latest + // finalized epoch, hence current_epoch = split_epoch + 1 + let current_epoch = + self.get_split_slot().epoch(E::slots_per_epoch()) + Epoch::new(1); + current_epoch - *MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS + } + }; + if !force { - if blob_info.last_pruned_epoch.as_u64() + self.get_config().epochs_per_blob_prune - > blob_info.next_epoch_to_prune.as_u64() + if last_pruned_epoch.as_u64() + self.get_config().epochs_per_blob_prune + > next_epoch_to_prune.as_u64() { info!(self.log, "Blobs sidecars are pruned"); return Ok(()); } } - let dab_state_root = blob_info.data_availability_boundary.state_root; - // Iterate block roots backwards to oldest blob slot. warn!( self.log, @@ -1741,18 +1757,12 @@ impl, Cold: ItemStore> HotColdDB let mut ops = vec![]; let mut last_pruned_block_root = None; + let end_slot = next_epoch_to_prune.start_slot(E::slots_per_epoch()); for res in self.forwards_block_roots_iterator_until( - blob_info.oldest_blob_slot, - blob_info.data_availability_boundary.slot, - || { - let dab_state = self - .get_state(&dab_state_root, None)? - .ok_or(HotColdDBError::MissingStateToPruneBlobs(dab_state_root))?; - let dab_block_root = dab_state.get_latest_block_root(dab_state_root); - - Ok((dab_state, dab_block_root)) - }, + oldest_blob_slot, + end_slot, + || Err(HotColdDBError::UnsupportedDataAvailabilityBoundary.into()), &self.spec, )? { let (block_root, slot) = match res { @@ -1780,7 +1790,7 @@ impl, Cold: ItemStore> HotColdDB ops.push(StoreOp::DeleteBlobs(block_root)); } - if slot <= blob_info.oldest_blob_slot { + if slot >= end_slot { info!( self.log, "Blobs sidecar pruning reached earliest available blobs sidecar"; @@ -1798,12 +1808,12 @@ impl, Cold: ItemStore> HotColdDB "blobs_sidecars_pruned" => blobs_sidecars_pruned, ); - if let Some(mut new_blob_info) = self.get_blob_info() { - new_blob_info.last_pruned_epoch = - (blob_info.data_availability_boundary.slot + 1).epoch(E::slots_per_epoch()); - new_blob_info.oldest_blob_slot = blob_info.data_availability_boundary.slot + 1; - self.compare_and_set_blob_info_with_write(self.get_blob_info(), Some(new_blob_info))?; - } + self.compare_and_set_blob_info_with_write( + Some(blob_info), + Some(BlobInfo { + oldest_blob_slot: end_slot + 1, + }), + )?; Ok(()) } @@ -1843,9 +1853,6 @@ pub fn migrate_database, Cold: ItemStore>( return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot()).into()); } - // Prune blobs before migration. - store.try_prune_blobs(true)?; - let mut hot_db_ops: Vec> = Vec::new(); // 1. Copy all of the states between the head and the split slot, from the hot DB diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 37d2e5645..4e7b0df7b 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -1,8 +1,8 @@ -use crate::{DBColumn, Error, Split, StoreItem}; +use crate::{DBColumn, Error, StoreItem}; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use types::{Checkpoint, Epoch, Hash256, Slot}; +use types::{Checkpoint, Hash256, Slot}; pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15); @@ -122,12 +122,6 @@ impl StoreItem for AnchorInfo { /// Database parameters relevant to blob sync. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] pub struct BlobInfo { - /// The latest epoch that blobs were pruned. - pub last_pruned_epoch: Epoch, - /// The next epoch to prune blobs from. - pub next_epoch_to_prune: Epoch, - /// The state root and slot of the next blobs to prune from. - pub data_availability_boundary: Split, /// The slot before which blobs are available. pub oldest_blob_slot: Slot, } diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 1cd6a3e08..fcf36e540 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -312,9 +312,9 @@ pub fn prune_blobs( log, )?; - // If we're triggering a prune manually then ignore the check on the split's parent that bails - // out early by passing true to the force parameter. - db.try_prune_blobs(true) + // If we're triggering a prune manually then ignore the check on `epochs_per_blob_prune` that + // bails out early by passing true to the force parameter. + db.try_prune_blobs(true, None) } /// Run the database manager, returning an error string if the operation did not succeed.