Make implementation of BlobInfo more coder friendly

This commit is contained in:
Emilia Hane 2023-01-24 17:29:10 +01:00
parent 8f137df02e
commit 00ca21e84c
No known key found for this signature in database
GPG Key ID: E73394F9C09206FA
2 changed files with 29 additions and 48 deletions

View File

@ -55,7 +55,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The starting slots for the range of blocks & states stored in the database. /// The starting slots for the range of blocks & states stored in the database.
anchor_info: RwLock<Option<AnchorInfo>>, anchor_info: RwLock<Option<AnchorInfo>>,
/// The starting slots for the range of blobs stored in the database. /// The starting slots for the range of blobs stored in the database.
blob_info: RwLock<Option<BlobInfo>>, blob_info: RwLock<BlobInfo>,
pub(crate) config: StoreConfig, pub(crate) config: StoreConfig,
/// Cold database containing compact historical data. /// Cold database containing compact historical data.
pub cold_db: Cold, pub cold_db: Cold,
@ -131,7 +131,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
let db = HotColdDB { let db = HotColdDB {
split: RwLock::new(Split::default()), split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None), anchor_info: RwLock::new(None),
blob_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()),
cold_db: MemoryStore::open(), cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(), hot_db: MemoryStore::open(),
block_cache: Mutex::new(LruCache::new(config.block_cache_size)), block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
@ -166,11 +166,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
let mut db = HotColdDB { let mut db = HotColdDB {
split: RwLock::new(Split::default()), split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None), anchor_info: RwLock::new(None),
blob_info: RwLock::new( blob_info: RwLock::new(BlobInfo::default()),
spec.eip4844_fork_epoch
.is_some()
.then(|| BlobInfo::default()),
),
cold_db: LevelDB::open(cold_path)?, cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?, hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)), block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
@ -218,12 +214,12 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
if let Some(blob_info) = db.load_blob_info()? { if let Some(blob_info) = db.load_blob_info()? {
let oldest_blob_slot = blob_info.oldest_blob_slot; let oldest_blob_slot = blob_info.oldest_blob_slot;
*db.blob_info.write() = Some(blob_info); *db.blob_info.write() = blob_info;
info!( info!(
db.log, db.log,
"Blob info loaded from disk"; "Blob info loaded from disk";
"oldest_blob_slot" => oldest_blob_slot, "oldest_blob_slot" => ?oldest_blob_slot,
); );
} }
@ -1357,7 +1353,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Get a clone of the store's blob info. /// Get a clone of the store's blob info.
/// ///
/// To do mutations, use `compare_and_set_blob_info`. /// To do mutations, use `compare_and_set_blob_info`.
pub fn get_blob_info(&self) -> Option<BlobInfo> { pub fn get_blob_info(&self) -> BlobInfo {
self.blob_info.read_recursive().clone() self.blob_info.read_recursive().clone()
} }
@ -1370,8 +1366,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// is not correct. /// is not correct.
fn compare_and_set_blob_info( fn compare_and_set_blob_info(
&self, &self,
prev_value: Option<BlobInfo>, prev_value: BlobInfo,
new_value: Option<BlobInfo>, new_value: BlobInfo,
) -> Result<KeyValueStoreOp, Error> { ) -> Result<KeyValueStoreOp, Error> {
let mut blob_info = self.blob_info.write(); let mut blob_info = self.blob_info.write();
if *blob_info == prev_value { if *blob_info == prev_value {
@ -1386,8 +1382,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately. /// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately.
fn compare_and_set_blob_info_with_write( fn compare_and_set_blob_info_with_write(
&self, &self,
prev_value: Option<BlobInfo>, prev_value: BlobInfo,
new_value: Option<BlobInfo>, new_value: BlobInfo,
) -> Result<(), Error> { ) -> Result<(), Error> {
let kv_store_op = self.compare_and_set_blob_info(prev_value, new_value)?; let kv_store_op = self.compare_and_set_blob_info(prev_value, new_value)?;
self.hot_db.do_atomically(vec![kv_store_op]) self.hot_db.do_atomically(vec![kv_store_op])
@ -1402,15 +1398,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// ///
/// The argument is intended to be `self.blob_info`, but is passed manually to avoid issues /// The argument is intended to be `self.blob_info`, but is passed manually to avoid issues
/// with recursive locking. /// with recursive locking.
fn store_blob_info_in_batch(&self, blob_info: &Option<BlobInfo>) -> KeyValueStoreOp { fn store_blob_info_in_batch(&self, blob_info: &BlobInfo) -> KeyValueStoreOp {
if let Some(ref blob_info) = blob_info { blob_info.as_kv_store_op(BLOB_INFO_KEY)
blob_info.as_kv_store_op(BLOB_INFO_KEY)
} else {
KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconMeta.into(),
BLOB_INFO_KEY.as_bytes(),
))
}
} }
/// Return the slot-window describing the available historic states. /// Return the slot-window describing the available historic states.
@ -1760,21 +1749,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(()); return Ok(());
} }
let blob_info = || -> BlobInfo { let blob_info = self.get_blob_info(); // now returns `BlobInfo` not `Option<_>`
if let Some(blob_info) = self.get_blob_info() { let oldest_blob_slot = blob_info
if blob_info.oldest_blob_slot.epoch(E::slots_per_epoch()) >= eip4844_fork { .oldest_blob_slot
return blob_info; .unwrap_or(eip4844_fork.start_slot(E::slots_per_epoch()));
}
}
// If BlobInfo is uninitialized this is probably the first time pruning blobs, or
// maybe oldest_blob_info has been initialized with Epoch::default. Start from the
// eip4844 fork epoch.
BlobInfo {
oldest_blob_slot: eip4844_fork.start_slot(E::slots_per_epoch()),
}
}();
let oldest_blob_slot = blob_info.oldest_blob_slot;
// The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the // The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the
// middle of an epoch otherwise the oldest blob slot is a start slot. // middle of an epoch otherwise the oldest blob slot is a start slot.
let last_pruned_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()) - 1; let last_pruned_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()) - 1;
@ -1796,11 +1775,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
// Iterate block roots forwards from the oldest blob slot. // Iterate block roots forwards from the oldest blob slot.
warn!( debug!(
self.log, self.log,
"Pruning blobs sidecars stored longer than data availability boundary"; "Pruning blobs sidecars stored longer than data availability boundary";
"info" => "you may notice degraded I/O performance while this runs"
); );
// todo(emhane): If we notice degraded I/O for users switching modes (prune_blobs=true to
// prune_blobs=false) we could add a warning that only fires on a threshold, e.g. more
// than 2x epochs_per_blob_prune epochs without a prune.
let mut ops = vec![]; let mut ops = vec![];
let mut last_pruned_block_root = None; let mut last_pruned_block_root = None;
@ -1810,10 +1791,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
oldest_blob_slot, oldest_blob_slot,
end_slot, end_slot,
|| { || {
// todo(emhane): In the future, if the data availability boundary is more recent than the // todo(emhane): In the future, if the data availability boundary is more recent
// split (finalized) epoch, this code will have to change to decide what to do // than the split (finalized) epoch, this code will have to change to decide what
// with pruned blobs in our not-yet-finalized canonical chain and not-yet-orphaned // to do with pruned blobs in our not-yet-finalized canonical chain and
// forks (see DBColumn::BeaconBlobOrphan). // not-yet-orphaned forks (see DBColumn::BeaconBlobOrphan).
// //
// Related to review and the spec PRs linked in it: // Related to review and the spec PRs linked in it:
// https://github.com/sigp/lighthouse/pull/3852#pullrequestreview-1244785136 // https://github.com/sigp/lighthouse/pull/3852#pullrequestreview-1244785136
@ -1865,10 +1846,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let blobs_sidecars_pruned = ops.len(); let blobs_sidecars_pruned = ops.len();
let update_blob_info = self.compare_and_set_blob_info( let update_blob_info = self.compare_and_set_blob_info(
Some(blob_info), blob_info,
Some(BlobInfo { BlobInfo {
oldest_blob_slot: end_slot + 1, oldest_blob_slot: Some(end_slot + 1),
}), },
)?; )?;
ops.push(StoreOp::PutRawKVStoreOp(update_blob_info)); ops.push(StoreOp::PutRawKVStoreOp(update_blob_info));

View File

@ -123,7 +123,7 @@ impl StoreItem for AnchorInfo {
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct BlobInfo { pub struct BlobInfo {
/// The slot after which blobs are available (>=). /// The slot after which blobs are available (>=).
pub oldest_blob_slot: Slot, pub oldest_blob_slot: Option<Slot>,
} }
impl StoreItem for BlobInfo { impl StoreItem for BlobInfo {