Add more comments to overflow LRU cache (#4406)

This commit is contained in:
ethDreamer 2023-06-16 08:27:36 -05:00 committed by GitHub
parent a62e52f319
commit e1af24b470
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 73 additions and 12 deletions

View File

@ -27,6 +27,11 @@ use types::{
mod overflow_lru_cache; mod overflow_lru_cache;
/// The LRU Cache stores `PendingComponents` which can store up to
/// `MAX_BLOBS_PER_BLOCK = 4` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.525024 MB. Setting this
/// to 1024 means the maximum size of the cache is ~ 0.5 GB. But the cache
/// will target a size of less than 75% of capacity.
pub const OVERFLOW_LRU_CAPACITY: usize = 1024; pub const OVERFLOW_LRU_CAPACITY: usize = 1024;
#[derive(Debug, IntoStaticStr)] #[derive(Debug, IntoStaticStr)]

View File

@ -1,3 +1,32 @@
//! This module implements a LRU cache for storing partially available blocks and blobs.
//! When the cache overflows, the least recently used items are persisted to the database.
//! This prevents lighthouse from using too much memory storing unfinalized blocks and blobs
//! if the chain were to lose finality.
//!
//! ## Deadlock safety
//!
//! The main object in this module is the `OverflowLruCache`. It contains two locks:
//!
//! - `self.critical` is an `RwLock` that protects content stored in memory.
//! - `self.maintenance_lock` is held when moving data between memory and disk.
//!
//! You mostly need to ensure that you don't try to hold the critical lock more than once
//!
//! ## Basic Algorithm
//!
//! As blocks and blobs come in from the network, their components are stored in memory in
//! this cache. When a block becomes fully available, it is removed from the cache and
//! imported into fork-choice. Blocks/blobs that remain unavailable will linger in the
//! cache until they are older than the finalized epoch or older than the data availability
//! cutoff. In the event the chain is not finalizing, the cache will eventually overflow and
//! the least recently used items will be persisted to disk. When this happens, we will still
//! store the hash of the block in memory so we always know we have data for that block
//! without needing to check the database.
//!
//! When the client is shut down, all pending components are persisted in the database.
//! On startup, the keys of these components are stored in memory and will be loaded in
//! the cache when they are accessed.
use crate::beacon_chain::BeaconStore; use crate::beacon_chain::BeaconStore;
use crate::blob_verification::KzgVerifiedBlob; use crate::blob_verification::KzgVerifiedBlob;
use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock};
@ -15,8 +44,7 @@ use types::{BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock};
type MissingBlobInfo<T> = (Option<Arc<SignedBeaconBlock<T>>>, HashSet<usize>); type MissingBlobInfo<T> = (Option<Arc<SignedBeaconBlock<T>>>, HashSet<usize>);
/// Caches partially available blobs and execution verified blocks corresponding /// This represents the components of a partially available block
/// to a given `block_hash` that are received over gossip.
/// ///
/// The blobs are all gossip and kzg verified. /// The blobs are all gossip and kzg verified.
/// The block has completed all verifications except the availability check. /// The block has completed all verifications except the availability check.
@ -102,6 +130,8 @@ impl<T: EthSpec> PendingComponents<T> {
} }
} }
/// Blocks and blobs are stored in the database sequentially so that it's
/// fast to iterate over all the data for a particular block.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
enum OverflowKey { enum OverflowKey {
Block(Hash256), Block(Hash256),
@ -136,6 +166,7 @@ impl OverflowKey {
struct OverflowStore<T: BeaconChainTypes>(BeaconStore<T>); struct OverflowStore<T: BeaconChainTypes>(BeaconStore<T>);
impl<T: BeaconChainTypes> OverflowStore<T> { impl<T: BeaconChainTypes> OverflowStore<T> {
/// Store pending components in the database
pub fn persist_pending_components( pub fn persist_pending_components(
&self, &self,
block_root: Hash256, block_root: Hash256,
@ -167,7 +198,8 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
Ok(()) Ok(())
} }
pub fn get_pending_components( /// Load the pending components that we have in the database for a given block root
pub fn load_pending_components(
&self, &self,
block_root: Hash256, block_root: Hash256,
) -> Result<Option<PendingComponents<T::EthSpec>>, AvailabilityCheckError> { ) -> Result<Option<PendingComponents<T::EthSpec>>, AvailabilityCheckError> {
@ -201,7 +233,7 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
Ok(maybe_pending_components) Ok(maybe_pending_components)
} }
// returns the hashes of all the blocks we have data for on disk /// Returns the hashes of all the blocks we have any data for on disk
pub fn read_keys_on_disk(&self) -> Result<HashSet<Hash256>, AvailabilityCheckError> { pub fn read_keys_on_disk(&self) -> Result<HashSet<Hash256>, AvailabilityCheckError> {
let mut disk_keys = HashSet::new(); let mut disk_keys = HashSet::new();
for res in self.0.hot_db.iter_raw_keys(DBColumn::OverflowLRUCache, &[]) { for res in self.0.hot_db.iter_raw_keys(DBColumn::OverflowLRUCache, &[]) {
@ -211,6 +243,7 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
Ok(disk_keys) Ok(disk_keys)
} }
/// Load a single block from the database (ignoring blobs)
pub fn load_block( pub fn load_block(
&self, &self,
block_root: &Hash256, block_root: &Hash256,
@ -227,6 +260,7 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
/// Load a single blob from the database
pub fn load_blob( pub fn load_blob(
&self, &self,
blob_id: &BlobIdentifier, blob_id: &BlobIdentifier,
@ -241,6 +275,7 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
/// Delete a set of keys from the database
pub fn delete_keys(&self, keys: &Vec<OverflowKey>) -> Result<(), AvailabilityCheckError> { pub fn delete_keys(&self, keys: &Vec<OverflowKey>) -> Result<(), AvailabilityCheckError> {
for key in keys { for key in keys {
self.0 self.0
@ -251,9 +286,13 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
} }
} }
// This data is protected by an RwLock /// This data stores the *critical* data that we need to keep in memory
/// protected by the RWLock
struct Critical<T: BeaconChainTypes> { struct Critical<T: BeaconChainTypes> {
/// This is the LRU cache of pending components
pub in_memory: LruCache<Hash256, PendingComponents<T::EthSpec>>, pub in_memory: LruCache<Hash256, PendingComponents<T::EthSpec>>,
/// This holds all the roots of the blocks for which we have
/// `PendingComponents` in the database.
pub store_keys: HashSet<Hash256>, pub store_keys: HashSet<Hash256>,
} }
@ -322,7 +361,10 @@ impl<T: BeaconChainTypes> Critical<T> {
None => { None => {
// not in memory, is it in the store? // not in memory, is it in the store?
if self.store_keys.remove(&block_root) { if self.store_keys.remove(&block_root) {
store.get_pending_components(block_root) // We don't need to remove the data from the store as we have removed it from
// `store_keys` so we won't go looking for it on disk. The maintenance thread
// will remove it from disk the next time it runs.
store.load_pending_components(block_root)
} else { } else {
Ok(None) Ok(None)
} }
@ -331,10 +373,16 @@ impl<T: BeaconChainTypes> Critical<T> {
} }
} }
/// This is the main struct for this module. Outside methods should
/// interact with the cache through this.
pub struct OverflowLRUCache<T: BeaconChainTypes> { pub struct OverflowLRUCache<T: BeaconChainTypes> {
/// Contains all the data we keep in memory, protected by an RwLock
critical: RwLock<Critical<T>>, critical: RwLock<Critical<T>>,
/// This is how we read and write components to the disk
overflow_store: OverflowStore<T>, overflow_store: OverflowStore<T>,
/// Mutex to guard maintenance methods which move data between disk and memory
maintenance_lock: Mutex<()>, maintenance_lock: Mutex<()>,
/// The capacity of the LRU cache
capacity: usize, capacity: usize,
} }
@ -354,6 +402,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}) })
} }
/// Returns whether or not a block is in the cache (in memory or on disk)
pub fn has_block(&self, block_root: &Hash256) -> bool { pub fn has_block(&self, block_root: &Hash256) -> bool {
let read_lock = self.critical.read(); let read_lock = self.critical.read();
if read_lock if read_lock
@ -373,6 +422,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
} }
} }
/// Fetch the missing blob info for a block without affecting the LRU ordering
pub fn get_missing_blob_info(&self, block_root: Hash256) -> MissingBlobInfo<T::EthSpec> { pub fn get_missing_blob_info(&self, block_root: Hash256) -> MissingBlobInfo<T::EthSpec> {
let read_lock = self.critical.read(); let read_lock = self.critical.read();
if let Some(cache) = read_lock.in_memory.peek(&block_root) { if let Some(cache) = read_lock.in_memory.peek(&block_root) {
@ -380,7 +430,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
} else if read_lock.store_keys.contains(&block_root) { } else if read_lock.store_keys.contains(&block_root) {
drop(read_lock); drop(read_lock);
// return default if there's an error reading from the store // return default if there's an error reading from the store
match self.overflow_store.get_pending_components(block_root) { match self.overflow_store.load_pending_components(block_root) {
Ok(Some(pending_components)) => pending_components.get_missing_blob_info(), Ok(Some(pending_components)) => pending_components.get_missing_blob_info(),
_ => Default::default(), _ => Default::default(),
} }
@ -389,6 +439,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
} }
} }
/// Fetch a blob from the cache without affecting the LRU ordering
pub fn peek_blob( pub fn peek_blob(
&self, &self,
blob_id: &BlobIdentifier, blob_id: &BlobIdentifier,
@ -558,7 +609,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
} }
} }
// writes all in_memory objects to disk /// write all in memory objects to disk
pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> { pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> {
let maintenance_lock = self.maintenance_lock.lock(); let maintenance_lock = self.maintenance_lock.lock();
let mut critical_lock = self.critical.write(); let mut critical_lock = self.critical.write();
@ -577,7 +628,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
Ok(()) Ok(())
} }
// maintain the cache /// maintain the cache
pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> {
// ensure memory usage is below threshold // ensure memory usage is below threshold
let threshold = self.capacity * 3 / 4; let threshold = self.capacity * 3 / 4;
@ -587,6 +638,8 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
Ok(()) Ok(())
} }
/// Enforce that the size of the cache is below a given threshold by
/// moving the least recently used items to disk.
fn maintain_threshold( fn maintain_threshold(
&self, &self,
threshold: usize, threshold: usize,
@ -645,6 +698,9 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
Ok(()) Ok(())
} }
/// Delete any data on disk that shouldn't be there. This can happen if
/// 1. The entry has been moved back to memory (or become fully available)
/// 2. The entry belongs to a block beyond the cutoff epoch
fn prune_disk(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { fn prune_disk(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> {
// ensure only one thread at a time can be deleting things from the disk or // ensure only one thread at a time can be deleting things from the disk or
// moving things between memory and storage // moving things between memory and storage
@ -1314,7 +1370,7 @@ mod test {
assert!(cache assert!(cache
.overflow_store .overflow_store
.get_pending_components(roots[0]) .load_pending_components(roots[0])
.expect("should exist") .expect("should exist")
.is_some()); .is_some());
@ -1375,7 +1431,7 @@ mod test {
assert!( assert!(
cache cache
.overflow_store .overflow_store
.get_pending_components(roots[1]) .load_pending_components(roots[1])
.expect("no error") .expect("no error")
.is_some(), .is_some(),
"second block should still be on disk" "second block should still be on disk"
@ -1383,7 +1439,7 @@ mod test {
assert!( assert!(
cache cache
.overflow_store .overflow_store
.get_pending_components(roots[0]) .load_pending_components(roots[0])
.expect("no error") .expect("no error")
.is_none(), .is_none(),
"first block should not be on disk" "first block should not be on disk"