Plug in running blob pruning in migrator, related bug fixes and add todos
This commit is contained in:
parent
d1b75e281f
commit
d7fc24a9d5
@ -3015,9 +3015,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
|
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
|
||||||
ops.push(StoreOp::PutState(block.state_root(), &state));
|
ops.push(StoreOp::PutState(block.state_root(), &state));
|
||||||
|
|
||||||
|
// Only store blobs at the data availability boundary or younger.
|
||||||
|
//
|
||||||
|
// todo(emhane): Should we add a marginal of one epoch here to ensure data availability
|
||||||
|
// consistency across network at epoch boundaries?
|
||||||
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
|
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
|
||||||
|
|
||||||
// Only store blobs that haven't passed the data availability boundary.
|
|
||||||
if Some(block_epoch) >= self.data_availability_boundary() {
|
if Some(block_epoch) >= self.data_availability_boundary() {
|
||||||
if let Some(blobs) = blobs? {
|
if let Some(blobs) = blobs? {
|
||||||
if blobs.blobs.len() > 0 {
|
if blobs.blobs.len() > 0 {
|
||||||
|
@ -915,19 +915,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Prune blobs sidecars older than the blob data availability boundary in the background.
|
// Prune blobs sidecars older than the blob data availability boundary in the background.
|
||||||
if beacon_chain.store.get_config().prune_blobs {
|
beacon_chain
|
||||||
let store = beacon_chain.store.clone();
|
.store_migrator
|
||||||
let log = log.clone();
|
.process_prune_blobs(beacon_chain.data_availability_boundary());
|
||||||
let data_availability_boundary = beacon_chain.data_availability_boundary();
|
|
||||||
beacon_chain.task_executor.spawn_blocking(
|
|
||||||
move || {
|
|
||||||
if let Err(e) = store.try_prune_blobs(false, data_availability_boundary) {
|
|
||||||
error!(log, "Error pruning blobs in background"; "error" => ?e);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"prune_blobs_background",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(beacon_chain)
|
Ok(beacon_chain)
|
||||||
}
|
}
|
||||||
|
@ -751,6 +751,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
// Drop the old cache head nice and early to try and free the memory as soon as possible.
|
// Drop the old cache head nice and early to try and free the memory as soon as possible.
|
||||||
drop(old_cached_head);
|
drop(old_cached_head);
|
||||||
|
|
||||||
|
// Prune blobs in the background.
|
||||||
|
self.store_migrator
|
||||||
|
.process_prune_blobs(self.data_availability_boundary());
|
||||||
|
|
||||||
// If the finalized checkpoint changed, perform some updates.
|
// If the finalized checkpoint changed, perform some updates.
|
||||||
//
|
//
|
||||||
// The `after_finalization` function will take a write-lock on `fork_choice`, therefore it
|
// The `after_finalization` function will take a write-lock on `fork_choice`, therefore it
|
||||||
@ -1014,21 +1018,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
// Take a write-lock on the canonical head and signal for it to prune.
|
// Take a write-lock on the canonical head and signal for it to prune.
|
||||||
self.canonical_head.fork_choice_write_lock().prune()?;
|
self.canonical_head.fork_choice_write_lock().prune()?;
|
||||||
|
|
||||||
// Prune blobs.
|
|
||||||
if self.store.get_config().prune_blobs {
|
|
||||||
let store = self.store.clone();
|
|
||||||
let log = self.log.clone();
|
|
||||||
let data_availability_boundary = self.data_availability_boundary();
|
|
||||||
self.task_executor.spawn_blocking(
|
|
||||||
move || {
|
|
||||||
if let Err(e) = store.try_prune_blobs(false, data_availability_boundary) {
|
|
||||||
error!(log, "Error pruning blobs in background"; "error" => ?e);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"prune_blobs_background",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1367,7 +1367,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
///
|
///
|
||||||
/// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided
|
/// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided
|
||||||
/// is not correct.
|
/// is not correct.
|
||||||
pub fn compare_and_set_blob_info(
|
fn compare_and_set_blob_info(
|
||||||
&self,
|
&self,
|
||||||
prev_value: Option<BlobInfo>,
|
prev_value: Option<BlobInfo>,
|
||||||
new_value: Option<BlobInfo>,
|
new_value: Option<BlobInfo>,
|
||||||
@ -1383,7 +1383,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately.
|
/// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately.
|
||||||
pub fn compare_and_set_blob_info_with_write(
|
fn compare_and_set_blob_info_with_write(
|
||||||
&self,
|
&self,
|
||||||
prev_value: Option<BlobInfo>,
|
prev_value: Option<BlobInfo>,
|
||||||
new_value: Option<BlobInfo>,
|
new_value: Option<BlobInfo>,
|
||||||
@ -1751,6 +1751,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let should_prune_blobs = self.get_config().prune_blobs;
|
||||||
|
if !should_prune_blobs && !force {
|
||||||
|
debug!(self.log, "Blob pruning is disabled";
|
||||||
|
"prune_blobs" => should_prune_blobs
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let blob_info = || -> BlobInfo {
|
let blob_info = || -> BlobInfo {
|
||||||
if let Some(blob_info) = self.get_blob_info() {
|
if let Some(blob_info) = self.get_blob_info() {
|
||||||
if blob_info.oldest_blob_slot.epoch(E::slots_per_epoch()) >= eip4844_fork {
|
if blob_info.oldest_blob_slot.epoch(E::slots_per_epoch()) >= eip4844_fork {
|
||||||
@ -1758,14 +1766,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If BlobInfo is uninitialized this is probably the first time pruning blobs, or
|
// If BlobInfo is uninitialized this is probably the first time pruning blobs, or
|
||||||
// maybe oldest_blob_info has been initialized with Epoch::default.
|
// maybe oldest_blob_info has been initialized with Epoch::default. Start from the
|
||||||
// start from the eip4844 fork epoch. No new blobs are imported into the beacon
|
// eip4844 fork epoch.
|
||||||
// chain that are older than the data availability boundary.
|
|
||||||
BlobInfo {
|
BlobInfo {
|
||||||
oldest_blob_slot: eip4844_fork.start_slot(E::slots_per_epoch()),
|
oldest_blob_slot: eip4844_fork.start_slot(E::slots_per_epoch()),
|
||||||
}
|
}
|
||||||
}();
|
}();
|
||||||
|
|
||||||
|
// todo(emhane): Should we add a marginal for how old blobs we import? If so needs to be
|
||||||
|
// reflected here when choosing which oldest slot to prune from.
|
||||||
let oldest_blob_slot = blob_info.oldest_blob_slot;
|
let oldest_blob_slot = blob_info.oldest_blob_slot;
|
||||||
// The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the
|
// The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the
|
||||||
// middle of an epoch otherwise the oldest blob slot is a start slot.
|
// middle of an epoch otherwise the oldest blob slot is a start slot.
|
||||||
@ -1789,14 +1798,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
|
|
||||||
let mut ops = vec![];
|
let mut ops = vec![];
|
||||||
let mut last_pruned_block_root = None;
|
let mut last_pruned_block_root = None;
|
||||||
let end_slot = data_availability_boundary.end_slot(E::slots_per_epoch());
|
// Prune up until the data availability boundary.
|
||||||
|
let end_slot = (data_availability_boundary - 1).end_slot(E::slots_per_epoch());
|
||||||
|
|
||||||
// todo(emhane): In the future, if the data availability boundary is less than the split
|
|
||||||
// epoch, this code will have to change to account for head candidates.
|
|
||||||
for res in self.forwards_block_roots_iterator_until(
|
for res in self.forwards_block_roots_iterator_until(
|
||||||
oldest_blob_slot,
|
oldest_blob_slot,
|
||||||
end_slot,
|
end_slot,
|
||||||
|| {
|
|| {
|
||||||
|
// todo(emhane): In the future, if the data availability boundary is less than the
|
||||||
|
// split (finalized) epoch, this code will have to change to decide what to do
|
||||||
|
// with pruned blobs in our not-yet-finalized canonical chain and not-yet-orphaned
|
||||||
|
// forks (see DBColumn::BeaconBlobOrphan).
|
||||||
|
//
|
||||||
|
// Related to review and the spec PRs linked in it:
|
||||||
|
// https://github.com/sigp/lighthouse/pull/3852#pullrequestreview-1244785136
|
||||||
let split = self.get_split_info();
|
let split = self.get_split_info();
|
||||||
|
|
||||||
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
|
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
|
||||||
|
Loading…
Reference in New Issue
Block a user