use crate::chunked_vector::{ store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots, }; use crate::config::{OnDiskStoreConfig, StoreConfig}; use crate::forwards_iter::HybridForwardsBlockRootsIterator; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ CompactionTimestamp, PruningCheckpoint, SchemaVersion, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, }; use crate::metrics; use crate::{ get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, }; use leveldb::iterator::LevelDBIterator; use lru::LruCache; use parking_lot::{Mutex, RwLock}; use slog::{debug, error, info, trace, warn, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::{ per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy, SlotProcessingError, }; use std::convert::TryInto; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use std::time::Duration; use types::*; /// Defines how blocks should be replayed on states. #[derive(PartialEq)] pub enum BlockReplay { /// Perform all transitions faithfully to the specification. Accurate, /// Don't compute state roots, eventually computing an invalid beacon state that can only be /// used for obtaining shuffling. InconsistentStateRoots, } /// On-disk database that stores finalized states efficiently. /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores /// intermittent "restore point" states pre-finalization. #[derive(Debug)] pub struct HotColdDB, Cold: ItemStore> { /// The slot and state root at the point where the database is split between hot and cold. /// /// States with slots less than `split.slot` are in the cold DB, while states with slots /// greater than or equal are in the hot DB. split: RwLock, config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. pub hot_db: Hot, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. spec: ChainSpec, /// Logger. pub(crate) log: Logger, /// Mere vessel for E. _phantom: PhantomData, } #[derive(Debug, PartialEq)] pub enum HotColdDBError { UnsupportedSchemaVersion { target_version: SchemaVersion, current_version: SchemaVersion, }, /// Recoverable error indicating that the database freeze point couldn't be updated /// due to the finalized block not lying on an epoch boundary (should be infrequent). FreezeSlotUnaligned(Slot), FreezeSlotError { current_split_slot: Slot, proposed_split_slot: Slot, }, MissingStateToFreeze(Hash256), MissingRestorePointHash(u64), MissingRestorePoint(Hash256), MissingColdStateSummary(Hash256), MissingHotStateSummary(Hash256), MissingEpochBoundaryState(Hash256), MissingSplitState(Hash256, Slot), HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), BlockReplaySlotError(SlotProcessingError), BlockReplayBlockError(BlockProcessingError), InvalidSlotsPerRestorePoint { slots_per_restore_point: u64, slots_per_historical_root: u64, slots_per_epoch: u64, }, RestorePointBlockHashError(BeaconStateError), IterationError { unexpected_key: BytesKey, }, } impl HotColdDB, MemoryStore> { pub fn open_ephemeral( config: StoreConfig, spec: ChainSpec, log: Logger, ) -> Result, MemoryStore>, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { split: RwLock::new(Split::default()), cold_db: MemoryStore::open(), hot_db: MemoryStore::open(), block_cache: Mutex::new(LruCache::new(config.block_cache_size)), config, spec, log, _phantom: PhantomData, }; Ok(db) } } impl HotColdDB, LevelDB> { /// Open a new or existing database, with the given paths to the hot and cold DBs. /// /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. pub fn open( hot_path: &Path, cold_path: &Path, config: StoreConfig, spec: ChainSpec, log: Logger, ) -> Result, LevelDB>, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { split: RwLock::new(Split::default()), cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), config, spec, log, _phantom: PhantomData, }; // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. if let Some(schema_version) = db.load_schema_version()? { debug!( db.log, "Attempting schema migration"; "from_version" => schema_version.as_u64(), "to_version" => CURRENT_SCHEMA_VERSION.as_u64(), ); db.migrate_schema(schema_version, CURRENT_SCHEMA_VERSION)?; } else { db.store_schema_version(CURRENT_SCHEMA_VERSION)?; } // Ensure that any on-disk config is compatible with the supplied config. if let Some(disk_config) = db.load_config()? { db.config.check_compatibility(&disk_config)?; } db.store_config()?; // Load the previous split slot from the database (if any). This ensures we can // stop and restart correctly. if let Some(split) = db.load_split()? { info!( db.log, "Hot-Cold DB initialized"; "version" => CURRENT_SCHEMA_VERSION.as_u64(), "split_slot" => split.slot, "split_state" => format!("{:?}", split.state_root) ); *db.split.write() = split; } // Run a garbage collection pass. db.remove_garbage()?; // If configured, run a foreground compaction pass. if db.config.compact_on_init { info!(db.log, "Running foreground compaction"); db.compact()?; info!(db.log, "Foreground compaction complete"); } Ok(db) } /// Return an iterator over the state roots of all temporary states. pub fn iter_temporary_state_roots(&self) -> impl Iterator> + '_ { let column = DBColumn::BeaconStateTemporary; let start_key = BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); let keys_iter = self.hot_db.keys_iter(); keys_iter.seek(&start_key); keys_iter .take_while(move |key| key.matches_column(column)) .map(move |bytes_key| { bytes_key.remove_column(column).ok_or_else(|| { HotColdDBError::IterationError { unexpected_key: bytes_key, } .into() }) }) } } impl, Cold: ItemStore> HotColdDB { /// Store a block and update the LRU cache. pub fn put_block( &self, block_root: &Hash256, block: SignedBeaconBlock, ) -> Result<(), Error> { // Store on disk. self.hot_db.put(block_root, &block)?; // Update cache. self.block_cache.lock().put(*block_root, block); Ok(()) } /// Fetch a block from the store. pub fn get_block(&self, block_root: &Hash256) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); // Check the cache. if let Some(block) = self.block_cache.lock().get(block_root) { metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT); return Ok(Some(block.clone())); } // Fetch from database. match self.hot_db.get::>(block_root)? { Some(block) => { // Add to cache. self.block_cache.lock().put(*block_root, block.clone()); Ok(Some(block)) } None => Ok(None), } } /// Delete a block from the store and the block cache. pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { self.block_cache.lock().pop(block_root); self.hot_db.delete::>(block_root) } pub fn put_state_summary( &self, state_root: &Hash256, summary: HotStateSummary, ) -> Result<(), Error> { self.hot_db.put(state_root, &summary).map_err(Into::into) } /// Store a state in the store. pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { let mut ops: Vec = Vec::new(); self.store_cold_state(state_root, &state, &mut ops)?; self.cold_db.do_atomically(ops) } else { let mut ops: Vec = Vec::new(); self.store_hot_state(state_root, state, &mut ops)?; self.hot_db.do_atomically(ops) } } /// Fetch a state from the store. /// /// If `slot` is provided then it will be used as a hint as to which database should /// be checked. Importantly, if the slot hint is provided and indicates a slot that lies /// in the freezer database, then only the freezer database will be accessed and `Ok(None)` /// will be returned if the provided `state_root` doesn't match the state root of the /// frozen state at `slot`. Consequently, if a state from a non-canonical chain is desired, it's /// best to set `slot` to `None`, or call `load_hot_state` directly. pub fn get_state( &self, state_root: &Hash256, slot: Option, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT); if let Some(slot) = slot { if slot < self.get_split_slot() { // Although we could avoid a DB lookup by shooting straight for the // frozen state using `load_cold_state_by_slot`, that would be incorrect // in the case where the caller provides a `state_root` that's off the canonical // chain. This way we avoid returning a state that doesn't match `state_root`. self.load_cold_state(state_root) } else { self.load_hot_state(state_root, BlockReplay::Accurate) } } else { match self.load_hot_state(state_root, BlockReplay::Accurate)? { Some(state) => Ok(Some(state)), None => self.load_cold_state(state_root), } } } /// Fetch a state from the store, but don't compute all of the values when replaying blocks /// upon that state (e.g., state roots). Additionally, only states from the hot store are /// returned. /// /// See `Self::get_state` for information about `slot`. /// /// ## Warning /// /// The returned state **is not a valid beacon state**, it can only be used for obtaining /// shuffling to process attestations. pub fn get_inconsistent_state_for_attestation_verification_only( &self, state_root: &Hash256, slot: Option, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT); if slot.map_or(false, |slot| slot < self.get_split_slot()) { Ok(None) } else { self.load_hot_state(state_root, BlockReplay::InconsistentStateRoots) } } /// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk. /// /// It is assumed that all states being deleted reside in the hot DB, even if their slot is less /// than the split point. You shouldn't delete states from the finalized portion of the chain /// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint /// (which will be deleted by this function but shouldn't be). pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { // Delete the state summary. self.hot_db .key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?; // Delete the full state if it lies on an epoch boundary. if slot % E::slots_per_epoch() == 0 { self.hot_db .key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?; } Ok(()) } pub fn forwards_block_roots_iterator( store: Arc, start_slot: Slot, end_state: BeaconState, end_block_root: Hash256, spec: &ChainSpec, ) -> Result>, Error> { HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec) } /// Load an epoch boundary state by using the hot state summary look-up. /// /// Will fall back to the cold DB if a hot state summary is not found. pub fn load_epoch_boundary_state( &self, state_root: &Hash256, ) -> Result>, Error> { if let Some(HotStateSummary { epoch_boundary_state_root, .. }) = self.load_hot_state_summary(state_root)? { // NOTE: minor inefficiency here because we load an unnecessary hot state summary // // `BlockReplay` should be irrelevant here since we never replay blocks for an epoch // boundary state in the hot DB. let state = self .load_hot_state(&epoch_boundary_state_root, BlockReplay::Accurate)? .ok_or(HotColdDBError::MissingEpochBoundaryState( epoch_boundary_state_root, ))?; Ok(Some(state)) } else { // Try the cold DB match self.load_cold_state_slot(state_root)? { Some(state_slot) => { let epoch_boundary_slot = state_slot / E::slots_per_epoch() * E::slots_per_epoch(); self.load_cold_state_by_slot(epoch_boundary_slot).map(Some) } None => Ok(None), } } } pub fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { self.hot_db.put(key, item) } pub fn get_item(&self, key: &Hash256) -> Result, Error> { self.hot_db.get(key) } pub fn item_exists(&self, key: &Hash256) -> Result { self.hot_db.exists::(key) } /// Convert a batch of `StoreOp` to a batch of `KeyValueStoreOp`. pub fn convert_to_kv_batch(&self, batch: &[StoreOp]) -> Result, Error> { let mut key_value_batch = Vec::with_capacity(batch.len()); for op in batch { match op { StoreOp::PutBlock(block_root, block) => { key_value_batch.push(block.as_kv_store_op(*block_root)); } StoreOp::PutState(state_root, state) => { self.store_hot_state(state_root, state, &mut key_value_batch)?; } StoreOp::PutStateSummary(state_root, summary) => { key_value_batch.push(summary.as_kv_store_op(*state_root)); } StoreOp::PutStateTemporaryFlag(state_root) => { key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)); } StoreOp::DeleteStateTemporaryFlag(state_root) => { let db_key = get_key_for_col(TemporaryFlag::db_column().into(), state_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(db_key)); } StoreOp::DeleteBlock(block_root) => { let key = get_key_for_col(DBColumn::BeaconBlock.into(), block_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } StoreOp::DeleteState(state_root, slot) => { let state_summary_key = get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) { let state_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); } } } } Ok(key_value_batch) } pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { let mut guard = self.block_cache.lock(); self.hot_db .do_atomically(self.convert_to_kv_batch(&batch)?)?; for op in &batch { match op { StoreOp::PutBlock(block_root, block) => { guard.put(*block_root, (**block).clone()); } StoreOp::PutState(_, _) => (), StoreOp::PutStateSummary(_, _) => (), StoreOp::PutStateTemporaryFlag(_) => (), StoreOp::DeleteStateTemporaryFlag(_) => (), StoreOp::DeleteBlock(block_root) => { guard.pop(block_root); } StoreOp::DeleteState(_, _) => (), } } Ok(()) } /// Store a post-finalization state efficiently in the hot database. /// /// On an epoch boundary, store a full state. On an intermediate slot, store /// just a backpointer to the nearest epoch boundary. pub fn store_hot_state( &self, state_root: &Hash256, state: &BeaconState, ops: &mut Vec, ) -> Result<(), Error> { // On the epoch boundary, store the full state. if state.slot % E::slots_per_epoch() == 0 { trace!( self.log, "Storing full state on epoch boundary"; "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); store_full_state(state_root, &state, ops)?; } // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. let hot_state_summary = HotStateSummary::new(state_root, state)?; let op = hot_state_summary.as_kv_store_op(*state_root); ops.push(op); Ok(()) } /// Load a post-finalization state from the hot database. /// /// Will replay blocks from the nearest epoch boundary. pub fn load_hot_state( &self, state_root: &Hash256, block_replay: BlockReplay, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); // If the state is marked as temporary, do not return it. It will become visible // only once its transaction commits and deletes its temporary flag. if self.load_state_temporary_flag(state_root)?.is_some() { return Ok(None); } if let Some(HotStateSummary { slot, latest_block_root, epoch_boundary_state_root, }) = self.load_hot_state_summary(state_root)? { let boundary_state = get_full_state(&self.hot_db, &epoch_boundary_state_root)?.ok_or( HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), )?; // Optimization to avoid even *thinking* about replaying blocks if we're already // on an epoch boundary. let state = if slot % E::slots_per_epoch() == 0 { boundary_state } else { let blocks = self.load_blocks_to_replay(boundary_state.slot, slot, latest_block_root)?; self.replay_blocks(boundary_state, blocks, slot, block_replay)? }; Ok(Some(state)) } else { Ok(None) } } /// Store a pre-finalization state in the freezer database. /// /// Will log a warning and not store anything if the state does not lie on a restore point /// boundary. pub fn store_cold_state( &self, state_root: &Hash256, state: &BeaconState, ops: &mut Vec, ) -> Result<(), Error> { if state.slot % self.config.slots_per_restore_point != 0 { warn!( self.log, "Not storing non-restore point state in freezer"; "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); return Ok(()); } trace!( self.log, "Creating restore point"; "slot" => state.slot, "state_root" => format!("{:?}", state_root) ); // 1. Convert to PartialBeaconState and store that in the DB. let partial_state = PartialBeaconState::from_state_forgetful(state); let op = partial_state.as_kv_store_op(*state_root); ops.push(op); // 2. Store updated vector entries. let db = &self.cold_db; store_updated_vector(BlockRoots, db, state, &self.spec, ops)?; store_updated_vector(StateRoots, db, state, &self.spec, ops)?; store_updated_vector(HistoricalRoots, db, state, &self.spec, ops)?; store_updated_vector(RandaoMixes, db, state, &self.spec, ops)?; // 3. Store restore point. let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point; self.store_restore_point_hash(restore_point_index, *state_root, ops); Ok(()) } /// Try to load a pre-finalization state from the freezer database. /// /// Return `None` if no state with `state_root` lies in the freezer. pub fn load_cold_state(&self, state_root: &Hash256) -> Result>, Error> { match self.load_cold_state_slot(state_root)? { Some(slot) => self.load_cold_state_by_slot(slot).map(Some), None => Ok(None), } } /// Load a pre-finalization state from the freezer database. /// /// Will reconstruct the state if it lies between restore points. pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result, Error> { if slot % self.config.slots_per_restore_point == 0 { let restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point; self.load_restore_point_by_index(restore_point_idx) } else { self.load_cold_intermediate_state(slot) } } /// Load a restore point state by its `state_root`. fn load_restore_point(&self, state_root: &Hash256) -> Result, Error> { let mut partial_state: PartialBeaconState = self .cold_db .get(state_root)? .ok_or_else(|| HotColdDBError::MissingRestorePoint(*state_root))?; // Fill in the fields of the partial state. partial_state.load_block_roots(&self.cold_db, &self.spec)?; partial_state.load_state_roots(&self.cold_db, &self.spec)?; partial_state.load_historical_roots(&self.cold_db, &self.spec)?; partial_state.load_randao_mixes(&self.cold_db, &self.spec)?; Ok(partial_state.try_into()?) } /// Load a restore point state by its `restore_point_index`. fn load_restore_point_by_index( &self, restore_point_index: u64, ) -> Result, Error> { let state_root = self.load_restore_point_hash(restore_point_index)?; self.load_restore_point(&state_root) } /// Load a frozen state that lies between restore points. fn load_cold_intermediate_state(&self, slot: Slot) -> Result, Error> { // 1. Load the restore points either side of the intermediate state. let low_restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point; let high_restore_point_idx = low_restore_point_idx + 1; // Acquire the read lock, so that the split can't change while this is happening. let split = self.split.read(); let low_restore_point = self.load_restore_point_by_index(low_restore_point_idx)?; // If the slot of the high point lies outside the freezer, use the split state // as the upper restore point. let high_restore_point = if high_restore_point_idx * self.config.slots_per_restore_point >= split.slot.as_u64() { self.get_state(&split.state_root, Some(split.slot))?.ok_or( HotColdDBError::MissingSplitState(split.state_root, split.slot), )? } else { self.load_restore_point_by_index(high_restore_point_idx)? }; // 2. Load the blocks from the high restore point back to the low restore point. let blocks = self.load_blocks_to_replay( low_restore_point.slot, slot, self.get_high_restore_point_block_root(&high_restore_point, slot)?, )?; // 3. Replay the blocks on top of the low restore point. self.replay_blocks(low_restore_point, blocks, slot, BlockReplay::Accurate) } /// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`. /// /// Defaults to the block root for `slot`, which *should* be in range. fn get_high_restore_point_block_root( &self, high_restore_point: &BeaconState, slot: Slot, ) -> Result { high_restore_point .get_block_root(slot) .or_else(|_| high_restore_point.get_oldest_block_root()) .map(|x| *x) .map_err(HotColdDBError::RestorePointBlockHashError) } /// Load the blocks between `start_slot` and `end_slot` by backtracking from `end_block_hash`. /// /// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot /// equal to `start_slot`, to reach a state with slot equal to `end_slot`. fn load_blocks_to_replay( &self, start_slot: Slot, end_slot: Slot, end_block_hash: Hash256, ) -> Result>, Error> { let mut blocks: Vec> = ParentRootBlockIterator::new(self, end_block_hash) .map(|result| result.map(|(_, block)| block)) // Include the block at the end slot (if any), it needs to be // replayed in order to construct the canonical state at `end_slot`. .filter(|result| { result .as_ref() .map_or(true, |block| block.message.slot <= end_slot) }) // Include the block at the start slot (if any). Whilst it doesn't need to be applied // to the state, it contains a potentially useful state root. .take_while(|result| { result .as_ref() .map_or(true, |block| block.message.slot >= start_slot) }) .collect::>()?; blocks.reverse(); Ok(blocks) } /// Replay `blocks` on top of `state` until `target_slot` is reached. /// /// Will skip slots as necessary. The returned state is not guaranteed /// to have any caches built, beyond those immediately required by block processing. fn replay_blocks( &self, mut state: BeaconState, mut blocks: Vec>, target_slot: Slot, block_replay: BlockReplay, ) -> Result, Error> { if block_replay == BlockReplay::InconsistentStateRoots { for i in 0..blocks.len() { blocks[i].message.state_root = Hash256::zero(); if i > 0 { blocks[i].message.parent_root = blocks[i - 1].canonical_root() } } } let state_root_from_prev_block = |i: usize, state: &BeaconState| { if i > 0 { let prev_block = &blocks[i - 1].message; if prev_block.slot == state.slot { Some(prev_block.state_root) } else { None } } else { None } }; for (i, block) in blocks.iter().enumerate() { if block.message.slot <= state.slot { continue; } while state.slot < block.message.slot { let state_root = match block_replay { BlockReplay::Accurate => state_root_from_prev_block(i, &state), BlockReplay::InconsistentStateRoots => Some(Hash256::zero()), }; per_slot_processing(&mut state, state_root, &self.spec) .map_err(HotColdDBError::BlockReplaySlotError)?; } per_block_processing( &mut state, &block, None, BlockSignatureStrategy::NoVerification, &self.spec, ) .map_err(HotColdDBError::BlockReplayBlockError)?; } while state.slot < target_slot { let state_root = match block_replay { BlockReplay::Accurate => state_root_from_prev_block(blocks.len(), &state), BlockReplay::InconsistentStateRoots => Some(Hash256::zero()), }; per_slot_processing(&mut state, state_root, &self.spec) .map_err(HotColdDBError::BlockReplaySlotError)?; } Ok(state) } /// Fetch a copy of the current split slot from memory. pub fn get_split_slot(&self) -> Slot { self.split.read().slot } /// Fetch the slot of the most recently stored restore point. pub fn get_latest_restore_point_slot(&self) -> Slot { (self.get_split_slot() - 1) / self.config.slots_per_restore_point * self.config.slots_per_restore_point } /// Load the database schema version from disk. fn load_schema_version(&self) -> Result, Error> { self.hot_db.get(&SCHEMA_VERSION_KEY) } /// Store the database schema version. pub(crate) fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> { self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version) } /// Load previously-stored config from disk. fn load_config(&self) -> Result, Error> { self.hot_db.get(&CONFIG_KEY) } /// Write the config to disk. fn store_config(&self) -> Result<(), Error> { self.hot_db.put(&CONFIG_KEY, &self.config.as_disk_config()) } /// Load the split point from disk. fn load_split(&self) -> Result, Error> { self.hot_db.get(&SPLIT_KEY) } /// Load the state root of a restore point. fn load_restore_point_hash(&self, restore_point_index: u64) -> Result { let key = Self::restore_point_key(restore_point_index); self.cold_db .get(&key)? .map(|r: RestorePointHash| r.state_root) .ok_or_else(|| HotColdDBError::MissingRestorePointHash(restore_point_index).into()) } /// Store the state root of a restore point. fn store_restore_point_hash( &self, restore_point_index: u64, state_root: Hash256, ops: &mut Vec, ) { let value = &RestorePointHash { state_root }; let op = value.as_kv_store_op(Self::restore_point_key(restore_point_index)); ops.push(op); } /// Convert a `restore_point_index` into a database key. fn restore_point_key(restore_point_index: u64) -> Hash256 { Hash256::from_low_u64_be(restore_point_index) } /// Load a frozen state's slot, given its root. fn load_cold_state_slot(&self, state_root: &Hash256) -> Result, Error> { Ok(self .cold_db .get(state_root)? .map(|s: ColdStateSummary| s.slot)) } /// Load a hot state's summary, given its root. pub fn load_hot_state_summary( &self, state_root: &Hash256, ) -> Result, Error> { self.hot_db.get(state_root) } /// Load the temporary flag for a state root, if one exists. /// /// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not /// exist -- you should call `load_hot_state_summary` to find out which. pub fn load_state_temporary_flag( &self, state_root: &Hash256, ) -> Result, Error> { self.hot_db.get(state_root) } /// Check that the restore point frequency is valid. /// /// Specifically, check that it is: /// (1) A divisor of the number of slots per historical root, and /// (2) Divisible by the number of slots per epoch /// /// /// (1) ensures that we have at least one restore point within range of our state /// root history when iterating backwards (and allows for more frequent restore points if /// desired). /// /// (2) ensures that restore points align with hot state summaries, making it /// quick to migrate hot to cold. fn verify_slots_per_restore_point(slots_per_restore_point: u64) -> Result<(), HotColdDBError> { let slots_per_historical_root = E::SlotsPerHistoricalRoot::to_u64(); let slots_per_epoch = E::slots_per_epoch(); if slots_per_restore_point > 0 && slots_per_historical_root % slots_per_restore_point == 0 && slots_per_restore_point % slots_per_epoch == 0 { Ok(()) } else { Err(HotColdDBError::InvalidSlotsPerRestorePoint { slots_per_restore_point, slots_per_historical_root, slots_per_epoch, }) } } /// Run a compaction pass to free up space used by deleted states. pub fn compact(&self) -> Result<(), Error> { self.hot_db.compact()?; Ok(()) } /// Return `true` if compaction on finalization/pruning is enabled. pub fn compact_on_prune(&self) -> bool { self.config.compact_on_prune } /// Load the checkpoint to begin pruning from (the "old finalized checkpoint"). pub fn load_pruning_checkpoint(&self) -> Result, Error> { Ok(self .hot_db .get(&PRUNING_CHECKPOINT_KEY)? .map(|pc: PruningCheckpoint| pc.checkpoint)) } /// Create a staged store for the pruning checkpoint. pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp { PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY) } /// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch. pub fn load_compaction_timestamp(&self) -> Result, Error> { Ok(self .hot_db .get(&COMPACTION_TIMESTAMP_KEY)? .map(|c: CompactionTimestamp| Duration::from_secs(c.0))) } /// Store the timestamp of the last compaction as a `Duration` since the UNIX epoch. pub fn store_compaction_timestamp(&self, compaction_timestamp: Duration) -> Result<(), Error> { self.hot_db.put( &COMPACTION_TIMESTAMP_KEY, &CompactionTimestamp(compaction_timestamp.as_secs()), ) } } /// Advance the split point of the store, moving new finalized states to the freezer. pub fn migrate_database, Cold: ItemStore>( store: Arc>, frozen_head_root: Hash256, frozen_head: &BeaconState, ) -> Result<(), Error> { debug!( store.log, "Freezer migration started"; "slot" => frozen_head.slot ); // 0. Check that the migration is sensible. // The new frozen head must increase the current split slot, and lie on an epoch // boundary (in order for the hot state summary scheme to work). let current_split_slot = store.split.read().slot; if frozen_head.slot < current_split_slot { return Err(HotColdDBError::FreezeSlotError { current_split_slot, proposed_split_slot: frozen_head.slot, } .into()); } if frozen_head.slot % E::slots_per_epoch() != 0 { return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot).into()); } let mut hot_db_ops: Vec> = Vec::new(); // 1. Copy all of the states between the head and the split slot, from the hot DB // to the cold DB. let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head); for maybe_pair in state_root_iter.take_while(|result| match result { Ok((_, slot)) => slot >= ¤t_split_slot, Err(_) => true, }) { let (state_root, slot) = maybe_pair?; let mut cold_db_ops: Vec = Vec::new(); if slot % store.config.slots_per_restore_point == 0 { let state: BeaconState = get_full_state(&store.hot_db, &state_root)? .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?; store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; } // Store a pointer from this state root to its slot, so we can later reconstruct states // from their state root alone. let cold_state_summary = ColdStateSummary { slot }; let op = cold_state_summary.as_kv_store_op(state_root); cold_db_ops.push(op); // There are data dependencies between calls to `store_cold_state()` that prevent us from // doing one big call to `store.cold_db.do_atomically()` at end of the loop. store.cold_db.do_atomically(cold_db_ops)?; // Delete the old summary, and the full state if we lie on an epoch boundary. hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); } // Warning: Critical section. We have to take care not to put any of the two databases in an // inconsistent state if the OS process dies at any point during the freezeing // procedure. // // Since it is pretty much impossible to be atomic across more than one database, we trade // losing track of states to delete, for consistency. In other words: We should be safe to die // at any point below but it may happen that some states won't be deleted from the hot database // and will remain there forever. Since dying in these particular few lines should be an // exceedingly rare event, this should be an acceptable tradeoff. // Flush to disk all the states that have just been migrated to the cold store. store.cold_db.sync()?; { let mut split_guard = store.split.write(); let latest_split_slot = split_guard.slot; // Detect a sitation where the split point is (erroneously) changed from more than one // place in code. if latest_split_slot != current_split_slot { error!( store.log, "Race condition detected: Split point changed while moving states to the freezer"; "previous split slot" => current_split_slot, "current split slot" => latest_split_slot, ); // Assume the freezing procedure will be retried in case this happens. return Err(Error::SplitPointModified( current_split_slot, latest_split_slot, )); } // Before updating the in-memory split value, we flush it to disk first, so that should the // OS process die at this point, we pick up from the right place after a restart. let split = Split { slot: frozen_head.slot, state_root: frozen_head_root, }; store.hot_db.put_sync(&SPLIT_KEY, &split)?; // Split point is now persisted in the hot database on disk. The in-memory split point // hasn't been modified elsewhere since we keep a write lock on it. It's safe to update // the in-memory split point now. *split_guard = split; } // Delete the states from the hot database if we got this far. store.do_atomically(hot_db_ops)?; debug!( store.log, "Freezer migration complete"; "slot" => frozen_head.slot ); Ok(()) } /// Struct for storing the split slot and state root in the database. #[derive(Debug, Clone, Copy, Default, Encode, Decode)] pub struct Split { slot: Slot, state_root: Hash256, } impl StoreItem for Split { fn db_column() -> DBColumn { DBColumn::BeaconMeta } fn as_store_bytes(&self) -> Vec { self.as_ssz_bytes() } fn from_store_bytes(bytes: &[u8]) -> Result { Ok(Self::from_ssz_bytes(bytes)?) } } /// Struct for summarising a state in the hot database. /// /// Allows full reconstruction by replaying blocks. #[derive(Debug, Clone, Copy, Default, Encode, Decode)] pub struct HotStateSummary { slot: Slot, latest_block_root: Hash256, epoch_boundary_state_root: Hash256, } impl StoreItem for HotStateSummary { fn db_column() -> DBColumn { DBColumn::BeaconStateSummary } fn as_store_bytes(&self) -> Vec { self.as_ssz_bytes() } fn from_store_bytes(bytes: &[u8]) -> Result { Ok(Self::from_ssz_bytes(bytes)?) } } impl HotStateSummary { /// Construct a new summary of the given state. pub fn new(state_root: &Hash256, state: &BeaconState) -> Result { // Fill in the state root on the latest block header if necessary (this happens on all // slots where there isn't a skip). let latest_block_root = state.get_latest_block_root(*state_root); let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch(); let epoch_boundary_state_root = if epoch_boundary_slot == state.slot { *state_root } else { *state .get_state_root(epoch_boundary_slot) .map_err(HotColdDBError::HotStateSummaryError)? }; Ok(HotStateSummary { slot: state.slot, latest_block_root, epoch_boundary_state_root, }) } } /// Struct for summarising a state in the freezer database. #[derive(Debug, Clone, Copy, Default, Encode, Decode)] struct ColdStateSummary { slot: Slot, } impl StoreItem for ColdStateSummary { fn db_column() -> DBColumn { DBColumn::BeaconStateSummary } fn as_store_bytes(&self) -> Vec { self.as_ssz_bytes() } fn from_store_bytes(bytes: &[u8]) -> Result { Ok(Self::from_ssz_bytes(bytes)?) } } /// Struct for storing the state root of a restore point in the database. #[derive(Debug, Clone, Copy, Default, Encode, Decode)] struct RestorePointHash { state_root: Hash256, } impl StoreItem for RestorePointHash { fn db_column() -> DBColumn { DBColumn::BeaconRestorePoint } fn as_store_bytes(&self) -> Vec { self.as_ssz_bytes() } fn from_store_bytes(bytes: &[u8]) -> Result { Ok(Self::from_ssz_bytes(bytes)?) } } #[derive(Debug, Clone, Copy, Default)] pub struct TemporaryFlag; impl StoreItem for TemporaryFlag { fn db_column() -> DBColumn { DBColumn::BeaconStateTemporary } fn as_store_bytes(&self) -> Vec { vec![] } fn from_store_bytes(_: &[u8]) -> Result { Ok(TemporaryFlag) } }