From d7fc24a9d5fdb162bbf67a19ace52978501e71ab Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Fri, 20 Jan 2023 12:12:32 +0100 Subject: [PATCH] Plug in running blob pruning in migrator, related bug fixes and add todos --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 ++-- beacon_node/beacon_chain/src/builder.rs | 16 ++-------- .../beacon_chain/src/canonical_head.rs | 19 +++--------- beacon_node/store/src/hot_cold_store.rs | 31 ++++++++++++++----- 4 files changed, 34 insertions(+), 38 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 250c64a1d..4629d8d13 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3015,9 +3015,11 @@ impl BeaconChain { ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); + // Only store blobs at the data availability boundary or younger. + // + // todo(emhane): Should we add a marginal of one epoch here to ensure data availability + // consistency across network at epoch boundaries? let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - - // Only store blobs that haven't passed the data availability boundary. if Some(block_epoch) >= self.data_availability_boundary() { if let Some(blobs) = blobs? { if blobs.blobs.len() > 0 { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 75e677cdf..0fd016e0b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -915,19 +915,9 @@ where } // Prune blobs sidecars older than the blob data availability boundary in the background. - if beacon_chain.store.get_config().prune_blobs { - let store = beacon_chain.store.clone(); - let log = log.clone(); - let data_availability_boundary = beacon_chain.data_availability_boundary(); - beacon_chain.task_executor.spawn_blocking( - move || { - if let Err(e) = store.try_prune_blobs(false, data_availability_boundary) { - error!(log, "Error pruning blobs in background"; "error" => ?e); - } - }, - "prune_blobs_background", - ); - } + beacon_chain + .store_migrator + .process_prune_blobs(beacon_chain.data_availability_boundary()); Ok(beacon_chain) } diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 46a5c5b23..0d14e3819 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -751,6 +751,10 @@ impl BeaconChain { // Drop the old cache head nice and early to try and free the memory as soon as possible. drop(old_cached_head); + // Prune blobs in the background. + self.store_migrator + .process_prune_blobs(self.data_availability_boundary()); + // If the finalized checkpoint changed, perform some updates. // // The `after_finalization` function will take a write-lock on `fork_choice`, therefore it @@ -1014,21 +1018,6 @@ impl BeaconChain { // Take a write-lock on the canonical head and signal for it to prune. self.canonical_head.fork_choice_write_lock().prune()?; - // Prune blobs. - if self.store.get_config().prune_blobs { - let store = self.store.clone(); - let log = self.log.clone(); - let data_availability_boundary = self.data_availability_boundary(); - self.task_executor.spawn_blocking( - move || { - if let Err(e) = store.try_prune_blobs(false, data_availability_boundary) { - error!(log, "Error pruning blobs in background"; "error" => ?e); - } - }, - "prune_blobs_background", - ); - } - Ok(()) } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 54d21a7c2..09a2de9a2 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1367,7 +1367,7 @@ impl, Cold: ItemStore> HotColdDB /// /// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided /// is not correct. - pub fn compare_and_set_blob_info( + fn compare_and_set_blob_info( &self, prev_value: Option, new_value: Option, @@ -1383,7 +1383,7 @@ impl, Cold: ItemStore> HotColdDB } /// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately. - pub fn compare_and_set_blob_info_with_write( + fn compare_and_set_blob_info_with_write( &self, prev_value: Option, new_value: Option, @@ -1751,6 +1751,14 @@ impl, Cold: ItemStore> HotColdDB } }; + let should_prune_blobs = self.get_config().prune_blobs; + if !should_prune_blobs && !force { + debug!(self.log, "Blob pruning is disabled"; + "prune_blobs" => should_prune_blobs + ); + return Ok(()); + } + let blob_info = || -> BlobInfo { if let Some(blob_info) = self.get_blob_info() { if blob_info.oldest_blob_slot.epoch(E::slots_per_epoch()) >= eip4844_fork { @@ -1758,14 +1766,15 @@ impl, Cold: ItemStore> HotColdDB } } // If BlobInfo is uninitialized this is probably the first time pruning blobs, or - // maybe oldest_blob_info has been initialized with Epoch::default. - // start from the eip4844 fork epoch. No new blobs are imported into the beacon - // chain that are older than the data availability boundary. + // maybe oldest_blob_info has been initialized with Epoch::default. Start from the + // eip4844 fork epoch. BlobInfo { oldest_blob_slot: eip4844_fork.start_slot(E::slots_per_epoch()), } }(); + // todo(emhane): Should we add a marginal for how old blobs we import? If so needs to be + // reflected here when choosing which oldest slot to prune from. let oldest_blob_slot = blob_info.oldest_blob_slot; // The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the // middle of an epoch otherwise the oldest blob slot is a start slot. @@ -1789,14 +1798,20 @@ impl, Cold: ItemStore> HotColdDB let mut ops = vec![]; let mut last_pruned_block_root = None; - let end_slot = data_availability_boundary.end_slot(E::slots_per_epoch()); + // Prune up until the data availability boundary. + let end_slot = (data_availability_boundary - 1).end_slot(E::slots_per_epoch()); - // todo(emhane): In the future, if the data availability boundary is less than the split - // epoch, this code will have to change to account for head candidates. for res in self.forwards_block_roots_iterator_until( oldest_blob_slot, end_slot, || { + // todo(emhane): In the future, if the data availability boundary is less than the + // split (finalized) epoch, this code will have to change to decide what to do + // with pruned blobs in our not-yet-finalized canonical chain and not-yet-orphaned + // forks (see DBColumn::BeaconBlobOrphan). + // + // Related to review and the spec PRs linked in it: + // https://github.com/sigp/lighthouse/pull/3852#pullrequestreview-1244785136 let split = self.get_split_info(); let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(