Boiler plate code for blobs pruning
This commit is contained in:
parent
a42d07592c
commit
e2a6da4274
@ -477,6 +477,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
.map(|payload| payload.is_some())
|
.map(|payload| payload.is_some())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if the blobs sidecar for a block exists on disk.
|
||||||
|
pub fn blobs_sidecar_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||||
|
self.get_item::<BlobInfo>(block_root)
|
||||||
|
.map(|blobs| blobs.is_some())
|
||||||
|
}
|
||||||
|
|
||||||
/// Determine whether a block exists in the database.
|
/// Determine whether a block exists in the database.
|
||||||
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||||
self.hot_db
|
self.hot_db
|
||||||
@ -777,6 +783,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StoreOp::DeleteBlobs(block_root) => {
|
||||||
|
let key = get_key_for_col(DBColumn::BeaconBlob.into(), block_root.as_bytes());
|
||||||
|
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
||||||
|
}
|
||||||
|
|
||||||
StoreOp::DeleteState(state_root, slot) => {
|
StoreOp::DeleteState(state_root, slot) => {
|
||||||
let state_summary_key =
|
let state_summary_key =
|
||||||
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes());
|
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes());
|
||||||
@ -826,6 +837,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
guard.pop(block_root);
|
guard.pop(block_root);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StoreOp::DeleteBlobs(block_root) => {
|
||||||
|
guard_blob.pop(block_root);
|
||||||
|
}
|
||||||
|
|
||||||
StoreOp::DeleteState(_, _) => (),
|
StoreOp::DeleteState(_, _) => (),
|
||||||
|
|
||||||
StoreOp::DeleteExecutionPayload(_) => (),
|
StoreOp::DeleteExecutionPayload(_) => (),
|
||||||
@ -835,6 +850,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
self.hot_db
|
self.hot_db
|
||||||
.do_atomically(self.convert_to_kv_batch(batch)?)?;
|
.do_atomically(self.convert_to_kv_batch(batch)?)?;
|
||||||
drop(guard);
|
drop(guard);
|
||||||
|
drop(guard_blob);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -1669,6 +1685,130 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
);
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn try_prune_blobs(&self, force: bool) -> Result<(), Error> {
|
||||||
|
let split = self.get_split_info();
|
||||||
|
|
||||||
|
if split.slot == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let eip4844_fork_slot = if let Some(epoch) = self.spec.eip4844_fork_epoch {
|
||||||
|
epoch.start_slot(E::slots_per_epoch())
|
||||||
|
} else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
// Load the split state so we can backtrack to find blobs sidecars.
|
||||||
|
// todo(emhane): MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS
|
||||||
|
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
|
||||||
|
HotColdDBError::MissingSplitState(split.state_root, split.slot),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// The finalized block may or may not have its blobs sidecar stored, depending on
|
||||||
|
// whether it was at a skipped slot. However for a fully pruned database its parent
|
||||||
|
// should *always* have been pruned. In case of a long split (no parent found) we
|
||||||
|
// continue as if the payloads are pruned, as the node probably has other things to worry
|
||||||
|
// about.
|
||||||
|
let split_block_root = split_state.get_latest_block_root(split.state_root);
|
||||||
|
|
||||||
|
let already_pruned =
|
||||||
|
process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| {
|
||||||
|
iter.find(|(_, block_root)| {
|
||||||
|
move || -> bool {
|
||||||
|
if *block_root != split_block_root {
|
||||||
|
if let Ok(Some(split_parent_block)) =
|
||||||
|
self.get_blinded_block(&block_root)
|
||||||
|
{
|
||||||
|
if let Ok(expected_kzg_commitments) =
|
||||||
|
split_parent_block.message().body().blob_kzg_commitments()
|
||||||
|
{
|
||||||
|
if expected_kzg_commitments.len() > 0 {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
.map_or(Ok(true), |(_, split_parent_root)| {
|
||||||
|
self.blobs_sidecar_exists(&split_parent_root)
|
||||||
|
.map(|exists| !exists)
|
||||||
|
})
|
||||||
|
})??;
|
||||||
|
|
||||||
|
if already_pruned && !force {
|
||||||
|
info!(self.log, "Blobs sidecars are pruned");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate block roots backwards to the Eip48444 fork or the latest blob slot, whichever
|
||||||
|
// comes first.
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Pruning finalized blobs sidecars";
|
||||||
|
"info" => "you may notice degraded I/O performance while this runs"
|
||||||
|
);
|
||||||
|
let latest_blob_slot = self.get_blob_info().map(|info| info.latest_blob_slot);
|
||||||
|
|
||||||
|
let mut ops = vec![];
|
||||||
|
let mut last_pruned_block_root = None;
|
||||||
|
|
||||||
|
for res in std::iter::once(Ok((split_block_root, split.slot)))
|
||||||
|
.chain(BlockRootsIterator::new(self, &split_state))
|
||||||
|
{
|
||||||
|
let (block_root, slot) = match res {
|
||||||
|
Ok(tuple) => tuple,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Stopping blobs sidecar pruning early";
|
||||||
|
"error" => ?e,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if slot < eip4844_fork_slot {
|
||||||
|
info!(
|
||||||
|
self.log,
|
||||||
|
"Blobs sidecar pruning reached Eip4844 boundary";
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if Some(block_root) != last_pruned_block_root
|
||||||
|
&& self.blobs_sidecar_exists(&block_root)?
|
||||||
|
{
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Pruning blobs sidecar";
|
||||||
|
"slot" => slot,
|
||||||
|
"block_root" => ?block_root,
|
||||||
|
);
|
||||||
|
last_pruned_block_root = Some(block_root);
|
||||||
|
ops.push(StoreOp::DeleteBlobs(block_root));
|
||||||
|
}
|
||||||
|
|
||||||
|
if Some(slot) == latest_blob_slot {
|
||||||
|
info!(
|
||||||
|
self.log,
|
||||||
|
"Blobs sidecar pruning reached anchor state";
|
||||||
|
"slot" => slot
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let blobs_sidecars_pruned = ops.len();
|
||||||
|
self.do_atomically(ops)?;
|
||||||
|
info!(
|
||||||
|
self.log,
|
||||||
|
"Blobs sidecar pruning complete";
|
||||||
|
"blobs_sidecars_pruned" => blobs_sidecars_pruned,
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||||
|
@ -161,6 +161,7 @@ pub enum StoreOp<'a, E: EthSpec> {
|
|||||||
PutStateTemporaryFlag(Hash256),
|
PutStateTemporaryFlag(Hash256),
|
||||||
DeleteStateTemporaryFlag(Hash256),
|
DeleteStateTemporaryFlag(Hash256),
|
||||||
DeleteBlock(Hash256),
|
DeleteBlock(Hash256),
|
||||||
|
DeleteBlobs(Hash256),
|
||||||
DeleteState(Hash256, Option<Slot>),
|
DeleteState(Hash256, Option<Slot>),
|
||||||
DeleteExecutionPayload(Hash256),
|
DeleteExecutionPayload(Hash256),
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user