From acd49d988d28ff9c2fce138d2b273e0442570854 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 23 Oct 2020 01:27:51 +0000 Subject: [PATCH] Implement database temp states to reduce memory usage (#1798) ## Issue Addressed Closes #800 Closes #1713 ## Proposed Changes Implement the temporary state storage algorithm described in #800. Specifically: * Add `DBColumn::BeaconStateTemporary`, for storing 0-length temporary marker values. * Store intermediate states immediately as they are created, marked temporary. Delete the temporary flag if the block is processed successfully. * Add a garbage collection process to delete leftover temporary states on start-up. * Bump the database schema version to 2 so that a DB with temporary states can't accidentally be used with older versions of the software. The auto-migration is a no-op, but puts in place some infra that we can use for future migrations (e.g. #1784) ## Additional Info There are two known race conditions, one potentially causing permanent faults (hopefully rare), and the other insignificant. ### Race 1: Permanent state marked temporary EDIT: this has been fixed by the addition of a lock around the relevant critical section There are 2 threads that are trying to store 2 different blocks that share some intermediate states (e.g. they both skip some slots from the current head). Consider this sequence of events: 1. Thread 1 checks if state `s` already exists, and seeing that it doesn't, prepares an atomic commit of `(s, s_temporary_flag)`. 2. Thread 2 does the same, but also gets as far as committing the state txn, finishing the processing of its block, and _deleting_ the temporary flag. 3. Thread 1 is (finally) scheduled again, and marks `s` as temporary with its transaction. 4. a) The process is killed, or thread 1's block fails verification and the temp flag is not deleted. This is a permanent failure! Any attempt to load state `s` will fail... hope it isn't on the main chain! Alternatively (4b) happens... b) Thread 1 finishes, and re-deletes the temporary flag. In this case the failure is transient, state `s` will disappear temporarily, but will come back once thread 1 finishes running. I _hope_ that steps 1-3 only happen very rarely, and 4a even more rarely. It's hard to know This once again begs the question of why we're using LevelDB (#483), when it clearly doesn't care about atomicity! A ham-fisted fix would be to wrap the hot and cold DBs in locks, which would bring us closer to how other DBs handle read-write transactions. E.g. [LMDB only allows one R/W transaction at a time](https://docs.rs/lmdb/0.8.0/lmdb/struct.Environment.html#method.begin_rw_txn). ### Race 2: Temporary state returned from `get_state` I don't think this race really matters, but in `load_hot_state`, if another thread stores a state between when we call `load_state_temporary_flag` and when we call `load_hot_state_summary`, then we could end up returning that state even though it's only a temporary state. I can't think of any case where this would be relevant, and I suspect if it did come up, it would be safe/recoverable (having data is safer than _not_ having data). This could be fixed by using a LevelDB read snapshot, but that would require substantial changes to how we read all our values, so I don't think it's worth it right now. --- beacon_node/beacon_chain/src/beacon_chain.rs | 15 +- .../beacon_chain/src/block_verification.rs | 50 ++++-- beacon_node/beacon_chain/src/chain_config.rs | 5 +- beacon_node/beacon_chain/src/migrate.rs | 3 +- beacon_node/beacon_chain/src/test_utils.rs | 2 +- beacon_node/beacon_chain/tests/store_tests.rs | 38 +++++ beacon_node/src/cli.rs | 6 +- beacon_node/store/src/garbage_collection.rs | 38 +++++ beacon_node/store/src/hot_cold_store.rs | 154 +++++++++++++----- beacon_node/store/src/leveldb_store.rs | 33 +++- beacon_node/store/src/lib.rs | 38 ++++- beacon_node/store/src/memory_store.rs | 17 +- beacon_node/store/src/metadata.rs | 8 +- beacon_node/store/src/schema_change.rs | 32 ++++ 14 files changed, 343 insertions(+), 96 deletions(-) create mode 100644 beacon_node/store/src/garbage_collection.rs create mode 100644 beacon_node/store/src/schema_change.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7f70c8f97..e387150fe 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1498,7 +1498,7 @@ impl BeaconChain { let block_root = fully_verified_block.block_root; let mut state = fully_verified_block.state; let current_slot = self.slot()?; - let mut ops = fully_verified_block.intermediate_states; + let mut ops = fully_verified_block.confirmation_db_batch; let attestation_observation_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION); @@ -1623,13 +1623,16 @@ impl BeaconChain { let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); - // Store all the states between the parent block state and this block's slot, the block and state. - ops.push(StoreOp::PutBlock(block_root.into(), signed_block.clone())); - ops.push(StoreOp::PutState( - block.state_root.into(), - Cow::Borrowed(&state), + // Store the block and its state, and execute the confirmation batch for the intermediate + // states, which will delete their temporary flags. + ops.push(StoreOp::PutBlock( + block_root, + Box::new(signed_block.clone()), )); + ops.push(StoreOp::PutState(block.state_root, &state)); + let txn_lock = self.store.hot_db.begin_rw_transaction(); self.store.do_atomically(ops)?; + drop(txn_lock); // The fork choice write-lock is dropped *after* the on-disk database has been updated. // This prevents inconsistency between the two at the expense of concurrency. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0e3e7db7d..aaa0425dd 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -63,7 +63,7 @@ use std::borrow::Cow; use std::convert::TryFrom; use std::fs; use std::io::Write; -use store::{Error as DBError, HotColdDB, HotStateSummary, StoreOp}; +use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp}; use tree_hash::TreeHash; use types::{ BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256, @@ -363,7 +363,7 @@ pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> { pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock, - pub intermediate_states: Vec>, + pub confirmation_db_batch: Vec>, } /// Implemented on types that can be converted into a `FullyVerifiedBlock`. @@ -676,9 +676,9 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE); - // Keep a batch of any states that were "skipped" (block-less) in between the parent state - // slot and the block slot. These will be stored in the database. - let mut intermediate_states: Vec> = Vec::new(); + // Stage a batch of operations to be completed atomically if this block is imported + // successfully. + let mut confirmation_db_batch = vec![]; // The block must have a higher slot than its parent. if block.slot() <= parent.beacon_state.slot { @@ -702,18 +702,36 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { // processing, but we get early access to it. let state_root = state.update_tree_hash_cache()?; - let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 { - StoreOp::PutState( - state_root.into(), - Cow::Owned(state.clone_with(CloneConfig::committee_caches_only())), - ) + // Store the state immediately, marking it as temporary, and staging the deletion + // of its temporary status as part of the larger atomic operation. + let txn_lock = chain.store.hot_db.begin_rw_transaction(); + let state_already_exists = + chain.store.load_hot_state_summary(&state_root)?.is_some(); + + let state_batch = if state_already_exists { + // If the state exists, it could be temporary or permanent, but in neither case + // should we rewrite it or store a new temporary flag for it. We *will* stage + // the temporary flag for deletion because it's OK to double-delete the flag, + // and we don't mind if another thread gets there first. + vec![] } else { - StoreOp::PutStateSummary( - state_root.into(), - HotStateSummary::new(&state_root, &state)?, - ) + vec![ + if state.slot % T::EthSpec::slots_per_epoch() == 0 { + StoreOp::PutState(state_root, &state) + } else { + StoreOp::PutStateSummary( + state_root, + HotStateSummary::new(&state_root, &state)?, + ) + }, + StoreOp::PutStateTemporaryFlag(state_root), + ] }; - intermediate_states.push(op); + chain.store.do_atomically(state_batch)?; + drop(txn_lock); + + confirmation_db_batch.push(StoreOp::DeleteStateTemporaryFlag(state_root)); + state_root }; @@ -801,7 +819,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { block_root, state, parent_block: parent.beacon_block, - intermediate_states, + confirmation_db_batch, }) } } diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index d84e62665..53e3f9e5e 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -1,9 +1,6 @@ use serde_derive::{Deserialize, Serialize}; use types::Checkpoint; -/// There is a 693 block skip in the current canonical Medalla chain, we use 700 to be safe. -pub const DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS: u64 = 700; - #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] pub struct ChainConfig { /// Maximum number of slots to skip when importing a consensus message (e.g., block, @@ -20,7 +17,7 @@ pub struct ChainConfig { impl Default for ChainConfig { fn default() -> Self { Self { - import_max_skip_slots: Some(DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS), + import_max_skip_slots: None, weak_subjectivity_checkpoint: None, } } diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index ce80232e3..7b41cb31e 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -436,11 +436,12 @@ impl, Cold: ItemStore> BackgroundMigrator> = abandoned_blocks .into_iter() + .map(Into::into) .map(StoreOp::DeleteBlock) .chain( abandoned_states .into_iter() - .map(|(slot, state_hash)| StoreOp::DeleteState(state_hash, slot)), + .map(|(slot, state_hash)| StoreOp::DeleteState(state_hash.into(), Some(slot))), ) .collect(); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index faa58ed07..00c4f5412 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -730,7 +730,7 @@ where } } - fn set_current_slot(&self, slot: Slot) { + pub fn set_current_slot(&self, slot: Slot) { let current_slot = self.chain.slot().unwrap(); let current_epoch = current_slot.epoch(E::slots_per_epoch()); let epoch = slot.epoch(E::slots_per_epoch()); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 50eccc48f..a082d08d0 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1614,6 +1614,44 @@ fn pruning_test( check_no_blocks_exist(&harness, stray_blocks.values()); } +#[test] +fn garbage_collect_temp_states_from_failed_block() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let slots_per_epoch = E::slots_per_epoch(); + + let genesis_state = harness.get_current_state(); + let block_slot = Slot::new(2 * slots_per_epoch); + let (mut block, state) = harness.make_block(genesis_state, block_slot); + + // Mutate the block to make it invalid, and re-sign it. + block.message.state_root = Hash256::repeat_byte(0xff); + let proposer_index = block.message.proposer_index as usize; + let block = block.message.sign( + &harness.validator_keypairs[proposer_index].sk, + &state.fork, + state.genesis_validators_root, + &harness.spec, + ); + + // The block should be rejected, but should store a bunch of temporary states. + harness.set_current_slot(block_slot); + harness.process_block_result(block).unwrap_err(); + + assert_eq!( + store.iter_temporary_state_roots().count(), + block_slot.as_usize() - 1 + ); + + drop(harness); + drop(store); + + // On startup, the store should garbage collect all the temporary states. + let store = get_store(&db_path); + assert_eq!(store.iter_temporary_state_roots().count(), 0); +} + /// Check that the head state's slot matches `expected_slot`. fn check_slot(harness: &TestHarness, expected_slot: u64) { let state = &harness.chain.head().expect("should get head").beacon_state; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 525516687..b7870121b 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -315,13 +315,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .long("max-skip-slots") .help( "Refuse to skip more than this many slots when processing a block or attestation. \ - This prevents nodes on minority forks from wasting our time and RAM, \ - but might need to be raised or set to 'none' in times of extreme network \ - outage." + This prevents nodes on minority forks from wasting our time and disk space, \ + but could also cause unnecessary consensus failures, so is disabled by default." ) .value_name("NUM_SLOTS") .takes_value(true) - .default_value("700") ) .arg( Arg::with_name("wss-checkpoint") diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs new file mode 100644 index 000000000..a44348636 --- /dev/null +++ b/beacon_node/store/src/garbage_collection.rs @@ -0,0 +1,38 @@ +//! Garbage collection process that runs at start-up to clean up the database. +use crate::hot_cold_store::HotColdDB; +use crate::{Error, LevelDB, StoreOp}; +use slog::debug; +use types::EthSpec; + +impl HotColdDB, LevelDB> +where + E: EthSpec, +{ + /// Clean up the database by performing one-off maintenance at start-up. + pub fn remove_garbage(&self) -> Result<(), Error> { + self.delete_temp_states() + } + + /// Delete the temporary states that were leftover by failed block imports. + pub fn delete_temp_states(&self) -> Result<(), Error> { + let delete_ops = + self.iter_temporary_state_roots() + .try_fold(vec![], |mut ops, state_root| { + let state_root = state_root?; + ops.push(StoreOp::DeleteState(state_root, None)); + ops.push(StoreOp::DeleteStateTemporaryFlag(state_root)); + Result::<_, Error>::Ok(ops) + })?; + + if !delete_ops.is_empty() { + debug!( + self.log, + "Garbage collecting {} temporary states", + delete_ops.len() / 2 + ); + self.do_atomically(delete_ops)?; + } + + Ok(()) + } +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 57fb713b6..e8b21c870 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -5,6 +5,7 @@ use crate::config::StoreConfig; use crate::forwards_iter::HybridForwardsBlockRootsIterator; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; +use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ @@ -15,6 +16,7 @@ use crate::{ get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, }; +use leveldb::iterator::LevelDBIterator; use lru::LruCache; use parking_lot::{Mutex, RwLock}; use slog::{debug, error, info, trace, warn, Logger}; @@ -46,8 +48,6 @@ pub enum BlockReplay { /// intermittent "restore point" states pre-finalization. #[derive(Debug)] pub struct HotColdDB, Cold: ItemStore> { - /// The schema version. Loaded from disk on initialization. - schema_version: SchemaVersion, /// The slot and state root at the point where the database is split between hot and cold. /// /// States with slots less than `split.slot` are in the cold DB, while states with slots @@ -73,8 +73,8 @@ pub struct HotColdDB, Cold: ItemStore> { #[derive(Debug, PartialEq)] pub enum HotColdDBError { UnsupportedSchemaVersion { - software_version: SchemaVersion, - disk_version: SchemaVersion, + target_version: SchemaVersion, + current_version: SchemaVersion, }, /// Recoverable error indicating that the database freeze point couldn't be updated /// due to the finalized block not lying on an epoch boundary (should be infrequent). @@ -101,6 +101,9 @@ pub enum HotColdDBError { slots_per_epoch: u64, }, RestorePointBlockHashError(BeaconStateError), + IterationError { + unexpected_key: BytesKey, + }, } impl HotColdDB, MemoryStore> { @@ -112,7 +115,6 @@ impl HotColdDB, MemoryStore> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { - schema_version: CURRENT_SCHEMA_VERSION, split: RwLock::new(Split::default()), cold_db: MemoryStore::open(), hot_db: MemoryStore::open(), @@ -141,7 +143,6 @@ impl HotColdDB, LevelDB> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { - schema_version: CURRENT_SCHEMA_VERSION, split: RwLock::new(Split::default()), cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, @@ -153,15 +154,15 @@ impl HotColdDB, LevelDB> { }; // Ensure that the schema version of the on-disk database matches the software. - // In the future, this would be the spot to hook in auto-migration, etc. + // If the version is mismatched, an automatic migration will be attempted. if let Some(schema_version) = db.load_schema_version()? { - if schema_version != CURRENT_SCHEMA_VERSION { - return Err(HotColdDBError::UnsupportedSchemaVersion { - software_version: CURRENT_SCHEMA_VERSION, - disk_version: schema_version, - } - .into()); - } + debug!( + db.log, + "Attempting schema migration"; + "from_version" => schema_version.as_u64(), + "to_version" => CURRENT_SCHEMA_VERSION.as_u64(), + ); + db.migrate_schema(schema_version, CURRENT_SCHEMA_VERSION)?; } else { db.store_schema_version(CURRENT_SCHEMA_VERSION)?; } @@ -178,14 +179,41 @@ impl HotColdDB, LevelDB> { info!( db.log, "Hot-Cold DB initialized"; - "version" => db.schema_version.0, + "version" => CURRENT_SCHEMA_VERSION.as_u64(), "split_slot" => split.slot, "split_state" => format!("{:?}", split.state_root) ); *db.split.write() = split; } + + // Finally, run a garbage collection pass. + db.remove_garbage()?; + Ok(db) } + + /// Return an iterator over the state roots of all temporary states. + pub fn iter_temporary_state_roots<'a>( + &'a self, + ) -> impl Iterator> + 'a { + let column = DBColumn::BeaconStateTemporary; + let start_key = + BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + + let keys_iter = self.hot_db.keys_iter(); + keys_iter.seek(&start_key); + + keys_iter + .take_while(move |key| key.matches_column(column)) + .map(move |bytes_key| { + bytes_key.remove_column(column).ok_or_else(|| { + HotColdDBError::IterationError { + unexpected_key: bytes_key, + } + .into() + }) + }) + } } impl, Cold: ItemStore> HotColdDB { @@ -391,39 +419,41 @@ impl, Cold: ItemStore> HotColdDB let mut key_value_batch = Vec::with_capacity(batch.len()); for op in batch { match op { - StoreOp::PutBlock(block_hash, block) => { - let untyped_hash: Hash256 = (*block_hash).into(); - key_value_batch.push(block.as_kv_store_op(untyped_hash)); + StoreOp::PutBlock(block_root, block) => { + key_value_batch.push(block.as_kv_store_op(*block_root)); } - StoreOp::PutState(state_hash, state) => { - let untyped_hash: Hash256 = (*state_hash).into(); - self.store_hot_state(&untyped_hash, state, &mut key_value_batch)?; + StoreOp::PutState(state_root, state) => { + self.store_hot_state(state_root, state, &mut key_value_batch)?; } - StoreOp::PutStateSummary(state_hash, summary) => { - let untyped_hash: Hash256 = (*state_hash).into(); - key_value_batch.push(summary.as_kv_store_op(untyped_hash)); + StoreOp::PutStateSummary(state_root, summary) => { + key_value_batch.push(summary.as_kv_store_op(*state_root)); } - StoreOp::DeleteBlock(block_hash) => { - let untyped_hash: Hash256 = (*block_hash).into(); - let key = - get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes()); + StoreOp::PutStateTemporaryFlag(state_root) => { + key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)); + } + + StoreOp::DeleteStateTemporaryFlag(state_root) => { + let db_key = + get_key_for_col(TemporaryFlag::db_column().into(), state_root.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(db_key)); + } + + StoreOp::DeleteBlock(block_root) => { + let key = get_key_for_col(DBColumn::BeaconBlock.into(), block_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } - StoreOp::DeleteState(state_hash, slot) => { - let untyped_hash: Hash256 = (*state_hash).into(); - let state_summary_key = get_key_for_col( - DBColumn::BeaconStateSummary.into(), - untyped_hash.as_bytes(), - ); + StoreOp::DeleteState(state_root, slot) => { + let state_summary_key = + get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); - if *slot % E::slots_per_epoch() == 0 { + if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) { let state_key = - get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes()); + get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); } } @@ -440,18 +470,20 @@ impl, Cold: ItemStore> HotColdDB for op in &batch { match op { - StoreOp::PutBlock(block_hash, block) => { - let untyped_hash: Hash256 = (*block_hash).into(); - guard.put(untyped_hash, block.clone()); + StoreOp::PutBlock(block_root, block) => { + guard.put(*block_root, (**block).clone()); } StoreOp::PutState(_, _) => (), StoreOp::PutStateSummary(_, _) => (), - StoreOp::DeleteBlock(block_hash) => { - let untyped_hash: Hash256 = (*block_hash).into(); - guard.pop(&untyped_hash); + StoreOp::PutStateTemporaryFlag(_) => (), + + StoreOp::DeleteStateTemporaryFlag(_) => (), + + StoreOp::DeleteBlock(block_root) => { + guard.pop(block_root); } StoreOp::DeleteState(_, _) => (), @@ -500,6 +532,12 @@ impl, Cold: ItemStore> HotColdDB ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); + // If the state is marked as temporary, do not return it. It will become visible + // only once its transaction commits and deletes its temporary flag. + if self.load_state_temporary_flag(state_root)?.is_some() { + return Ok(None); + } + if let Some(HotStateSummary { slot, latest_block_root, @@ -785,7 +823,7 @@ impl, Cold: ItemStore> HotColdDB } /// Store the database schema version. - fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> { + pub(crate) fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> { self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version) } @@ -846,6 +884,17 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.get(state_root) } + /// Load the temporary flag for a state root, if one exists. + /// + /// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not + /// exist -- you should call `load_hot_state_summary` to find out which. + pub fn load_state_temporary_flag( + &self, + state_root: &Hash256, + ) -> Result, Error> { + self.hot_db.get(state_root) + } + /// Check that the restore point frequency is valid. /// /// Specifically, check that it is: @@ -937,7 +986,7 @@ pub fn migrate_database, Cold: ItemStore>( store.cold_db.do_atomically(cold_db_ops)?; // Delete the old summary, and the full state if we lie on an epoch boundary. - hot_db_ops.push(StoreOp::DeleteState(state_root.into(), slot)); + hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); } // Warning: Critical section. We have to take care not to put any of the two databases in an @@ -1107,3 +1156,20 @@ impl StoreItem for RestorePointHash { Ok(Self::from_ssz_bytes(bytes)?) } } + +#[derive(Debug, Clone, Copy, Default)] +pub struct TemporaryFlag; + +impl StoreItem for TemporaryFlag { + fn db_column() -> DBColumn { + DBColumn::BeaconStateTemporary + } + + fn as_store_bytes(&self) -> Vec { + vec![] + } + + fn from_store_bytes(_: &[u8]) -> Result { + Ok(TemporaryFlag) + } +} diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index f1c46610c..fc0dea487 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -5,13 +5,17 @@ use leveldb::database::batch::{Batch, Writebatch}; use leveldb::database::kv::KV; use leveldb::database::Database; use leveldb::error::Error as LevelDBError; +use leveldb::iterator::{Iterable, KeyIterator}; use leveldb::options::{Options, ReadOptions, WriteOptions}; +use parking_lot::{Mutex, MutexGuard}; use std::marker::PhantomData; use std::path::Path; /// A wrapped leveldb database. pub struct LevelDB { db: Database, + /// A mutex to synchronise sensitive read-write transactions. + transaction_mutex: Mutex<()>, _phantom: PhantomData, } @@ -23,9 +27,11 @@ impl LevelDB { options.create_if_missing = true; let db = Database::open(path, options)?; + let transaction_mutex = Mutex::new(()); Ok(Self { db, + transaction_mutex, _phantom: PhantomData, }) } @@ -64,6 +70,10 @@ impl LevelDB { metrics::stop_timer(timer); }) } + + pub fn keys_iter(&self) -> KeyIterator { + self.db.keys_iter(self.read_options()) + } } impl KeyValueStore for LevelDB { @@ -138,11 +148,16 @@ impl KeyValueStore for LevelDB { self.db.write(self.write_options(), &leveldb_batch)?; Ok(()) } + + fn begin_rw_transaction(&self) -> MutexGuard<()> { + self.transaction_mutex.lock() + } } impl ItemStore for LevelDB {} /// Used for keying leveldb. +#[derive(Debug, PartialEq)] pub struct BytesKey { key: Vec, } @@ -158,7 +173,23 @@ impl Key for BytesKey { } impl BytesKey { - fn from_vec(key: Vec) -> Self { + /// Return `true` iff this `BytesKey` was created with the given `column`. + pub fn matches_column(&self, column: DBColumn) -> bool { + self.key.starts_with(column.as_bytes()) + } + + /// Remove the column from a key, returning its `Hash256` portion. + pub fn remove_column(&self, column: DBColumn) -> Option { + if self.matches_column(column) { + let subkey = &self.key[column.as_bytes().len()..]; + if subkey.len() == 32 { + return Some(Hash256::from_slice(subkey)); + } + } + None + } + + pub fn from_vec(key: Vec) -> Self { Self { key } } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index f249be1f8..38411d9a1 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -15,6 +15,7 @@ pub mod chunked_vector; pub mod config; pub mod errors; mod forwards_iter; +mod garbage_collection; pub mod hot_cold_store; mod impls; mod leveldb_store; @@ -22,11 +23,10 @@ mod memory_store; mod metadata; mod metrics; mod partial_beacon_state; +mod schema_change; pub mod iter; -use std::borrow::Cow; - pub use self::config::StoreConfig; pub use self::hot_cold_store::{BlockReplay, HotColdDB, HotStateSummary, Split}; pub use self::leveldb_store::LevelDB; @@ -35,6 +35,7 @@ pub use self::partial_beacon_state::PartialBeaconState; pub use errors::Error; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metrics::scrape_for_metrics; +use parking_lot::MutexGuard; pub use types::*; pub trait KeyValueStore: Sync + Send + Sized + 'static { @@ -60,6 +61,12 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Execute either all of the operations in `batch` or none at all, returning an error. fn do_atomically(&self, batch: Vec) -> Result<(), Error>; + + /// Return a mutex guard that can be used to synchronize sensitive transactions. + /// + /// This doesn't prevent other threads writing to the DB unless they also use + /// this method. In future we may implement a safer mandatory locking scheme. + fn begin_rw_transaction(&self) -> MutexGuard<()>; } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { @@ -121,13 +128,14 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 -#[allow(clippy::large_enum_variant)] pub enum StoreOp<'a, E: EthSpec> { - PutBlock(SignedBeaconBlockHash, SignedBeaconBlock), - PutState(BeaconStateHash, Cow<'a, BeaconState>), - PutStateSummary(BeaconStateHash, HotStateSummary), - DeleteBlock(SignedBeaconBlockHash), - DeleteState(BeaconStateHash, Slot), + PutBlock(Hash256, Box>), + PutState(Hash256, &'a BeaconState), + PutStateSummary(Hash256, HotStateSummary), + PutStateTemporaryFlag(Hash256), + DeleteStateTemporaryFlag(Hash256), + DeleteBlock(Hash256), + DeleteState(Hash256, Option), } /// A unique column identifier. @@ -146,6 +154,9 @@ pub enum DBColumn { BeaconRestorePoint, /// For the mapping from state roots to their slots or summaries. BeaconStateSummary, + /// For the list of temporary states stored during block import, + /// and then made non-temporary by the deletion of their state root from this column. + BeaconStateTemporary, BeaconBlockRoots, BeaconStateRoots, BeaconHistoricalRoots, @@ -166,6 +177,7 @@ impl Into<&'static str> for DBColumn { DBColumn::ForkChoice => "frk", DBColumn::BeaconRestorePoint => "brp", DBColumn::BeaconStateSummary => "bss", + DBColumn::BeaconStateTemporary => "bst", DBColumn::BeaconBlockRoots => "bbr", DBColumn::BeaconStateRoots => "bsr", DBColumn::BeaconHistoricalRoots => "bhr", @@ -175,6 +187,16 @@ impl Into<&'static str> for DBColumn { } } +impl DBColumn { + pub fn as_str(self) -> &'static str { + self.into() + } + + pub fn as_bytes(self) -> &'static [u8] { + self.as_str().as_bytes() + } +} + /// An item that may stored in a `Store` by serializing and deserializing from bytes. pub trait StoreItem: Sized { /// Identifies which column this item should be placed in. diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 19308c86d..2df503965 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,5 +1,5 @@ use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; -use parking_lot::RwLock; +use parking_lot::{Mutex, MutexGuard, RwLock}; use std::collections::HashMap; use std::marker::PhantomData; use types::*; @@ -9,23 +9,16 @@ type DBHashMap = HashMap, Vec>; /// A thread-safe `HashMap` wrapper. pub struct MemoryStore { db: RwLock, + transaction_mutex: Mutex<()>, _phantom: PhantomData, } -impl Clone for MemoryStore { - fn clone(&self) -> Self { - Self { - db: RwLock::new(self.db.read().clone()), - _phantom: PhantomData, - } - } -} - impl MemoryStore { /// Create a new, empty database. pub fn open() -> Self { Self { db: RwLock::new(HashMap::new()), + transaction_mutex: Mutex::new(()), _phantom: PhantomData, } } @@ -87,6 +80,10 @@ impl KeyValueStore for MemoryStore { } Ok(()) } + + fn begin_rw_transaction(&self) -> MutexGuard<()> { + self.transaction_mutex.lock() + } } impl ItemStore for MemoryStore {} diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 2d4733d63..c03046986 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem}; use ssz::{Decode, Encode}; use types::Hash256; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(1); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2); // All the keys that get stored under the `BeaconMeta` column. // @@ -14,6 +14,12 @@ pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct SchemaVersion(pub u64); +impl SchemaVersion { + pub fn as_u64(self) -> u64 { + self.0 + } +} + impl StoreItem for SchemaVersion { fn db_column() -> DBColumn { DBColumn::BeaconMeta diff --git a/beacon_node/store/src/schema_change.rs b/beacon_node/store/src/schema_change.rs new file mode 100644 index 000000000..ad89e47be --- /dev/null +++ b/beacon_node/store/src/schema_change.rs @@ -0,0 +1,32 @@ +//! Utilities for managing database schema changes. +use crate::hot_cold_store::{HotColdDB, HotColdDBError}; +use crate::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}; +use crate::{Error, ItemStore}; +use types::EthSpec; + +impl HotColdDB +where + E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, +{ + /// Migrate the database from one schema version to another, applying all requisite mutations. + pub fn migrate_schema(&self, from: SchemaVersion, to: SchemaVersion) -> Result<(), Error> { + match (from, to) { + // Migration from v0.3.0 to v0.3.x, adding the temporary states column. + // Nothing actually needs to be done, but once a DB uses v2 it shouldn't go back. + (SchemaVersion(1), SchemaVersion(2)) => { + self.store_schema_version(to)?; + Ok(()) + } + // Migrating from the current schema version to iself is always OK, a no-op. + (_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()), + // Anything else is an error. + (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { + target_version: to, + current_version: from, + } + .into()), + } + } +}