Prune blobs before data availability breakpoint

This commit is contained in:
Emilia Hane 2023-01-06 18:29:42 +01:00
parent e2a6da4274
commit 7bf88c2336
No known key found for this signature in database
GPG Key ID: E73394F9C09206FA
4 changed files with 66 additions and 40 deletions

View File

@ -3021,6 +3021,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); info!(self.log, "Writing blobs to store"; "block_root" => ?block_root);
ops.push(StoreOp::PutBlobs(block_root, blobs)); ops.push(StoreOp::PutBlobs(block_root, blobs));
} }
// Update db's metadata for blobs pruning.
if current_slot == current_epoch.start_slot(T::EthSpec::slots_per_epoch()) {
if let Some(mut blob_info) = self.store.get_blob_info() {
let next_epoch_to_prune =
blob_info.last_pruned_epoch + *MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
if current_epoch > next_epoch_to_prune {
blob_info.availability_breakpoint = block_root;
self.store.compare_and_set_blob_info_with_write(
self.store.get_blob_info(),
Some(blob_info),
)?;
}
}
}
}; };
let txn_lock = self.store.hot_db.begin_rw_transaction(); let txn_lock = self.store.hot_db.begin_rw_transaction();

View File

@ -94,6 +94,7 @@ pub enum HotColdDBError {
MissingHotStateSummary(Hash256), MissingHotStateSummary(Hash256),
MissingEpochBoundaryState(Hash256), MissingEpochBoundaryState(Hash256),
MissingSplitState(Hash256, Slot), MissingSplitState(Hash256, Slot),
MissingStateToPruneBlobs(Hash256, Slot),
MissingExecutionPayload(Hash256), MissingExecutionPayload(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo, MissingAnchorInfo,
@ -1687,41 +1688,50 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
pub fn try_prune_blobs(&self, force: bool) -> Result<(), Error> { pub fn try_prune_blobs(&self, force: bool) -> Result<(), Error> {
let split = self.get_split_info(); let mut blob_info: BlobInfo;
if split.slot == 0 { if let Some(old_blob_info) = self.get_blob_info() {
blob_info = old_blob_info;
} else {
return Ok(()); return Ok(());
} }
let eip4844_fork_slot = if let Some(epoch) = self.spec.eip4844_fork_epoch { if blob_info.availability_breakpoint == blob_info.oldest_blob_parent {
epoch.start_slot(E::slots_per_epoch())
} else {
return Ok(()); return Ok(());
}; }
// Load the split state so we can backtrack to find blobs sidecars. // Load the state from which to prune blobs so we can backtrack.
// todo(emhane): MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS let erase_state = self
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( .get_state(
HotColdDBError::MissingSplitState(split.state_root, split.slot), &blob_info.availability_breakpoint,
)?; Some(blob_info.last_pruned_epoch.end_slot(E::slots_per_epoch())),
)?
.ok_or(HotColdDBError::MissingStateToPruneBlobs(
blob_info.availability_breakpoint,
blob_info.oldest_blob_slot,
))?;
// The data availability breakpoint is set at the start of an epoch indicating the epoch
// before can be pruned.
let erase_epoch = erase_state.current_epoch() - 1;
let erase_slot = erase_epoch.end_slot(E::slots_per_epoch());
// The finalized block may or may not have its blobs sidecar stored, depending on // The finalized block may or may not have its blobs sidecar stored, depending on
// whether it was at a skipped slot. However for a fully pruned database its parent // whether it was at a skipped slot. However for a fully pruned database its parent
// should *always* have been pruned. In case of a long split (no parent found) we // should *always* have been pruned. In the case of blobs sidecars we look at the next
// continue as if the payloads are pruned, as the node probably has other things to worry // parent block with at least one kzg commitment.
// about.
let split_block_root = split_state.get_latest_block_root(split.state_root);
let already_pruned = let already_pruned = process_results(
process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| { BlockRootsIter::new(&erase_state, blob_info.oldest_blob_slot),
iter.find(|(_, block_root)| { |mut iter| {
iter.find(|(slot, block_root)| {
move || -> bool { move || -> bool {
if *block_root != split_block_root { if *slot <= erase_slot {
if let Ok(Some(split_parent_block)) = if let Ok(Some(erase_parent_block)) =
self.get_blinded_block(&block_root) self.get_blinded_block(&block_root)
{ {
if let Ok(expected_kzg_commitments) = if let Ok(expected_kzg_commitments) =
split_parent_block.message().body().blob_kzg_commitments() erase_parent_block.message().body().blob_kzg_commitments()
{ {
if expected_kzg_commitments.len() > 0 { if expected_kzg_commitments.len() > 0 {
return true; return true;
@ -1736,28 +1746,25 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.blobs_sidecar_exists(&split_parent_root) self.blobs_sidecar_exists(&split_parent_root)
.map(|exists| !exists) .map(|exists| !exists)
}) })
})??; },
)??;
if already_pruned && !force { if already_pruned && !force {
info!(self.log, "Blobs sidecars are pruned"); info!(self.log, "Blobs sidecars are pruned");
return Ok(()); return Ok(());
} }
// Iterate block roots backwards to the Eip48444 fork or the latest blob slot, whichever // Iterate block roots backwards to oldest blob slot.
// comes first.
warn!( warn!(
self.log, self.log,
"Pruning finalized blobs sidecars"; "Pruning blobs sidecars stored longer than data availability boundary";
"info" => "you may notice degraded I/O performance while this runs" "info" => "you may notice degraded I/O performance while this runs"
); );
let latest_blob_slot = self.get_blob_info().map(|info| info.latest_blob_slot);
let mut ops = vec![]; let mut ops = vec![];
let mut last_pruned_block_root = None; let mut last_pruned_block_root = None;
for res in std::iter::once(Ok((split_block_root, split.slot))) for res in BlockRootsIterator::new(self, &erase_state) {
.chain(BlockRootsIterator::new(self, &split_state))
{
let (block_root, slot) = match res { let (block_root, slot) = match res {
Ok(tuple) => tuple, Ok(tuple) => tuple,
Err(e) => { Err(e) => {
@ -1770,14 +1777,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
}; };
if slot < eip4844_fork_slot {
info!(
self.log,
"Blobs sidecar pruning reached Eip4844 boundary";
);
break;
}
if Some(block_root) != last_pruned_block_root if Some(block_root) != last_pruned_block_root
&& self.blobs_sidecar_exists(&block_root)? && self.blobs_sidecar_exists(&block_root)?
{ {
@ -1791,15 +1790,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
ops.push(StoreOp::DeleteBlobs(block_root)); ops.push(StoreOp::DeleteBlobs(block_root));
} }
if Some(slot) == latest_blob_slot { if slot <= erase_slot {
info!( info!(
self.log, self.log,
"Blobs sidecar pruning reached anchor state"; "Blobs sidecar pruning reached earliest available blob state";
"slot" => slot "slot" => slot
); );
break; break;
} }
} }
let blobs_sidecars_pruned = ops.len(); let blobs_sidecars_pruned = ops.len();
self.do_atomically(ops)?; self.do_atomically(ops)?;
info!( info!(
@ -1807,6 +1807,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
"Blobs sidecar pruning complete"; "Blobs sidecar pruning complete";
"blobs_sidecars_pruned" => blobs_sidecars_pruned, "blobs_sidecars_pruned" => blobs_sidecars_pruned,
); );
blob_info.last_pruned_epoch = erase_epoch;
blob_info.oldest_blob_parent = blob_info.availability_breakpoint;
self.compare_and_set_blob_info_with_write(self.get_blob_info(), Some(blob_info))?;
Ok(()) Ok(())
} }
} }

View File

@ -35,6 +35,7 @@ pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split};
pub use self::leveldb_store::LevelDB; pub use self::leveldb_store::LevelDB;
pub use self::memory_store::MemoryStore; pub use self::memory_store::MemoryStore;
pub use self::partial_beacon_state::PartialBeaconState; pub use self::partial_beacon_state::PartialBeaconState;
pub use crate::metadata::BlobInfo;
pub use errors::Error; pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metadata::AnchorInfo; pub use metadata::AnchorInfo;

View File

@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem};
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Hash256, Slot}; use types::{Checkpoint, Epoch, Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15); pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15);
@ -122,6 +122,10 @@ impl StoreItem for AnchorInfo {
/// Database parameters relevant to blob sync. /// Database parameters relevant to blob sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)]
pub struct BlobInfo { pub struct BlobInfo {
/// The latest epoch that blobs were pruned.
pub last_pruned_epoch: Epoch,
/// The block root of the next blobs to prune from.
pub availability_breakpoint: Hash256,
/// The block root of the next blob that needs to be added to fill in the history. /// The block root of the next blob that needs to be added to fill in the history.
pub oldest_blob_parent: Hash256, pub oldest_blob_parent: Hash256,
/// The slot before which blobs are available. /// The slot before which blobs are available.