Simplify conceptual design

This commit is contained in:
Emilia Hane 2023-01-16 20:53:09 +01:00
parent 0d13932663
commit 7103a257ce
No known key found for this signature in database
GPG Key ID: E73394F9C09206FA
5 changed files with 66 additions and 91 deletions

View File

@ -918,9 +918,13 @@ where
if beacon_chain.store.get_config().prune_blobs { if beacon_chain.store.get_config().prune_blobs {
let store = beacon_chain.store.clone(); let store = beacon_chain.store.clone();
let log = log.clone(); let log = log.clone();
let current_slot = beacon_chain
.slot()
.map_err(|e| format!("Failed to get current slot: {:?}", e))?;
let current_epoch = current_slot.epoch(TEthSpec::slots_per_epoch());
beacon_chain.task_executor.spawn_blocking( beacon_chain.task_executor.spawn_blocking(
move || { move || {
if let Err(e) = store.try_prune_blobs(false) { if let Err(e) = store.try_prune_blobs(false, Some(current_epoch)) {
error!(log, "Error pruning blobs in background"; "error" => ?e); error!(log, "Error pruning blobs in background"; "error" => ?e);
} }
}, },

View File

@ -54,9 +54,8 @@ use slog::{crit, debug, error, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use store::{iter::StateRootsIterator, KeyValueStoreOp, Split, StoreItem}; use store::{iter::StateRootsIterator, KeyValueStoreOp, StoreItem};
use task_executor::{JoinHandle, ShutdownReason}; use task_executor::{JoinHandle, ShutdownReason};
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
use types::*; use types::*;
/// Simple wrapper around `RwLock` that uses private visibility to prevent any other modules from /// Simple wrapper around `RwLock` that uses private visibility to prevent any other modules from
@ -794,51 +793,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.execution_status .execution_status
.is_optimistic_or_invalid(); .is_optimistic_or_invalid();
if self.store.get_config().prune_blobs {
let current_slot = self.slot()?;
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
if Some(current_epoch)
> self.spec.eip4844_fork_epoch.map(|eip4844_fork_epoch| {
eip4844_fork_epoch + *MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS
})
{
let current_epoch_start_slot =
current_epoch.start_slot(T::EthSpec::slots_per_epoch());
// Update db's metadata for blobs pruning.
if current_slot == current_epoch_start_slot {
if let Some(mut blob_info) = self.store.get_blob_info() {
if let Some(data_availability_boundary) = self.data_availability_boundary()
{
let dab_slot =
data_availability_boundary.end_slot(T::EthSpec::slots_per_epoch());
if let Some(dab_state_root) = self.state_root_at_slot(dab_slot)? {
blob_info.data_availability_boundary =
Split::new(dab_slot, dab_state_root);
self.store.compare_and_set_blob_info_with_write(
self.store.get_blob_info(),
Some(blob_info),
)?;
}
}
}
}
}
let store = self.store.clone();
let log = self.log.clone();
self.task_executor.spawn_blocking(
move || {
if let Err(e) = store.try_prune_blobs(false) {
error!(log, "Error pruning blobs in background"; "error" => ?e);
}
},
"prune_blobs_background",
);
}
// Detect and potentially report any re-orgs. // Detect and potentially report any re-orgs.
let reorg_distance = detect_reorg( let reorg_distance = detect_reorg(
&old_snapshot.beacon_state, &old_snapshot.beacon_state,
@ -1060,6 +1014,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Take a write-lock on the canonical head and signal for it to prune. // Take a write-lock on the canonical head and signal for it to prune.
self.canonical_head.fork_choice_write_lock().prune()?; self.canonical_head.fork_choice_write_lock().prune()?;
// Prune blobs.
if self.store.get_config().prune_blobs {
let store = self.store.clone();
let log = self.log.clone();
let current_slot = self.slot()?;
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
self.task_executor.spawn_blocking(
move || {
if let Err(e) = store.try_prune_blobs(false, Some(current_epoch)) {
error!(log, "Error pruning blobs in background"; "error" => ?e);
}
},
"prune_blobs_background",
);
}
Ok(()) Ok(())
} }

View File

@ -38,6 +38,7 @@ use std::marker::PhantomData;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
use types::*; use types::*;
/// On-disk database that stores finalized states efficiently. /// On-disk database that stores finalized states efficiently.
@ -80,6 +81,7 @@ pub enum HotColdDBError {
target_version: SchemaVersion, target_version: SchemaVersion,
current_version: SchemaVersion, current_version: SchemaVersion,
}, },
UnsupportedDataAvailabilityBoundary,
/// Recoverable error indicating that the database freeze point couldn't be updated /// Recoverable error indicating that the database freeze point couldn't be updated
/// due to the finalized block not lying on an epoch boundary (should be infrequent). /// due to the finalized block not lying on an epoch boundary (should be infrequent).
FreezeSlotUnaligned(Slot), FreezeSlotUnaligned(Slot),
@ -94,7 +96,6 @@ pub enum HotColdDBError {
MissingHotStateSummary(Hash256), MissingHotStateSummary(Hash256),
MissingEpochBoundaryState(Hash256), MissingEpochBoundaryState(Hash256),
MissingSplitState(Hash256, Slot), MissingSplitState(Hash256, Slot),
MissingStateToPruneBlobs(Hash256),
MissingExecutionPayload(Hash256), MissingExecutionPayload(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo, MissingAnchorInfo,
@ -217,15 +218,13 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
} }
if let Some(blob_info) = db.load_blob_info()? { if let Some(blob_info) = db.load_blob_info()? {
let dab_slot = blob_info.data_availability_boundary.slot; let oldest_blob_slot = blob_info.oldest_blob_slot;
let dab_state_root = blob_info.data_availability_boundary.state_root;
*db.blob_info.write() = Some(blob_info); *db.blob_info.write() = Some(blob_info);
info!( info!(
db.log, db.log,
"Blob info loaded from disk"; "Blob info loaded from disk";
"data_availability_boundary_slot" => dab_slot, "oldest_blob_slot" => oldest_blob_slot,
"data_availability_boundary_state" => ?dab_state_root,
); );
} }
@ -1375,7 +1374,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
*blob_info = new_value; *blob_info = new_value;
Ok(kv_op) Ok(kv_op)
} else { } else {
Err(Error::AnchorInfoConcurrentMutation) Err(Error::BlobInfoConcurrentMutation)
} }
} }
@ -1713,25 +1712,42 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
/// Try to prune blobs older than the data availability boundary. /// Try to prune blobs older than the data availability boundary.
pub fn try_prune_blobs(&self, force: bool) -> Result<(), Error> { pub fn try_prune_blobs(
&self,
force: bool,
data_availability_boundary: Option<Epoch>,
) -> Result<(), Error> {
let blob_info = match self.get_blob_info() { let blob_info = match self.get_blob_info() {
Some(old_blob_info) => old_blob_info, Some(blob_info) => blob_info,
None => { None => {
return Ok(()); return Ok(());
} }
}; };
let oldest_blob_slot = blob_info.oldest_blob_slot;
// The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the
// middle of an epoch.
let last_pruned_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()) - 1;
let next_epoch_to_prune = match data_availability_boundary {
Some(epoch) => epoch,
None => {
// The split slot is set upon finalization and is the first slot in the latest
// finalized epoch, hence current_epoch = split_epoch + 1
let current_epoch =
self.get_split_slot().epoch(E::slots_per_epoch()) + Epoch::new(1);
current_epoch - *MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS
}
};
if !force { if !force {
if blob_info.last_pruned_epoch.as_u64() + self.get_config().epochs_per_blob_prune if last_pruned_epoch.as_u64() + self.get_config().epochs_per_blob_prune
> blob_info.next_epoch_to_prune.as_u64() > next_epoch_to_prune.as_u64()
{ {
info!(self.log, "Blobs sidecars are pruned"); info!(self.log, "Blobs sidecars are pruned");
return Ok(()); return Ok(());
} }
} }
let dab_state_root = blob_info.data_availability_boundary.state_root;
// Iterate block roots backwards to oldest blob slot. // Iterate block roots backwards to oldest blob slot.
warn!( warn!(
self.log, self.log,
@ -1741,18 +1757,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut ops = vec![]; let mut ops = vec![];
let mut last_pruned_block_root = None; let mut last_pruned_block_root = None;
let end_slot = next_epoch_to_prune.start_slot(E::slots_per_epoch());
for res in self.forwards_block_roots_iterator_until( for res in self.forwards_block_roots_iterator_until(
blob_info.oldest_blob_slot, oldest_blob_slot,
blob_info.data_availability_boundary.slot, end_slot,
|| { || Err(HotColdDBError::UnsupportedDataAvailabilityBoundary.into()),
let dab_state = self
.get_state(&dab_state_root, None)?
.ok_or(HotColdDBError::MissingStateToPruneBlobs(dab_state_root))?;
let dab_block_root = dab_state.get_latest_block_root(dab_state_root);
Ok((dab_state, dab_block_root))
},
&self.spec, &self.spec,
)? { )? {
let (block_root, slot) = match res { let (block_root, slot) = match res {
@ -1780,7 +1790,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
ops.push(StoreOp::DeleteBlobs(block_root)); ops.push(StoreOp::DeleteBlobs(block_root));
} }
if slot <= blob_info.oldest_blob_slot { if slot >= end_slot {
info!( info!(
self.log, self.log,
"Blobs sidecar pruning reached earliest available blobs sidecar"; "Blobs sidecar pruning reached earliest available blobs sidecar";
@ -1798,12 +1808,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
"blobs_sidecars_pruned" => blobs_sidecars_pruned, "blobs_sidecars_pruned" => blobs_sidecars_pruned,
); );
if let Some(mut new_blob_info) = self.get_blob_info() { self.compare_and_set_blob_info_with_write(
new_blob_info.last_pruned_epoch = Some(blob_info),
(blob_info.data_availability_boundary.slot + 1).epoch(E::slots_per_epoch()); Some(BlobInfo {
new_blob_info.oldest_blob_slot = blob_info.data_availability_boundary.slot + 1; oldest_blob_slot: end_slot + 1,
self.compare_and_set_blob_info_with_write(self.get_blob_info(), Some(new_blob_info))?; }),
} )?;
Ok(()) Ok(())
} }
@ -1843,9 +1853,6 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot()).into()); return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot()).into());
} }
// Prune blobs before migration.
store.try_prune_blobs(true)?;
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new(); let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
// 1. Copy all of the states between the head and the split slot, from the hot DB // 1. Copy all of the states between the head and the split slot, from the hot DB

View File

@ -1,8 +1,8 @@
use crate::{DBColumn, Error, Split, StoreItem}; use crate::{DBColumn, Error, StoreItem};
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Epoch, Hash256, Slot}; use types::{Checkpoint, Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15); pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15);
@ -122,12 +122,6 @@ impl StoreItem for AnchorInfo {
/// Database parameters relevant to blob sync. /// Database parameters relevant to blob sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct BlobInfo { pub struct BlobInfo {
/// The latest epoch that blobs were pruned.
pub last_pruned_epoch: Epoch,
/// The next epoch to prune blobs from.
pub next_epoch_to_prune: Epoch,
/// The state root and slot of the next blobs to prune from.
pub data_availability_boundary: Split,
/// The slot before which blobs are available. /// The slot before which blobs are available.
pub oldest_blob_slot: Slot, pub oldest_blob_slot: Slot,
} }

View File

@ -312,9 +312,9 @@ pub fn prune_blobs<E: EthSpec>(
log, log,
)?; )?;
// If we're triggering a prune manually then ignore the check on the split's parent that bails // If we're triggering a prune manually then ignore the check on `epochs_per_blob_prune` that
// out early by passing true to the force parameter. // bails out early by passing true to the force parameter.
db.try_prune_blobs(true) db.try_prune_blobs(true, None)
} }
/// Run the database manager, returning an error string if the operation did not succeed. /// Run the database manager, returning an error string if the operation did not succeed.