Implement database temp states to reduce memory usage (#1798)

## Issue Addressed

Closes #800
Closes #1713

## Proposed Changes

Implement the temporary state storage algorithm described in #800. Specifically:

* Add `DBColumn::BeaconStateTemporary`, for storing 0-length temporary marker values.
* Store intermediate states immediately as they are created, marked temporary. Delete the temporary flag if the block is processed successfully.
* Add a garbage collection process to delete leftover temporary states on start-up.
* Bump the database schema version to 2 so that a DB with temporary states can't accidentally be used with older versions of the software. The auto-migration is a no-op, but puts in place some infra that we can use for future migrations (e.g. #1784)

## Additional Info

There are two known race conditions, one potentially causing permanent faults (hopefully rare), and the other insignificant.

### Race 1: Permanent state marked temporary

EDIT: this has been fixed by the addition of a lock around the relevant critical section

There are 2 threads that are trying to store 2 different blocks that share some intermediate states (e.g. they both skip some slots from the current head). Consider this sequence of events:

1. Thread 1 checks if state `s` already exists, and seeing that it doesn't, prepares an atomic commit of `(s, s_temporary_flag)`.
2. Thread 2 does the same, but also gets as far as committing the state txn, finishing the processing of its block, and _deleting_ the temporary flag.
3. Thread 1 is (finally) scheduled again, and marks `s` as temporary with its transaction.
4.
    a) The process is killed, or thread 1's block fails verification and the temp flag is not deleted. This is a permanent failure! Any attempt to load state `s` will fail... hope it isn't on the main chain! Alternatively (4b) happens...
    b) Thread 1 finishes, and re-deletes the temporary flag. In this case the failure is transient, state `s` will disappear temporarily, but will come back once thread 1 finishes running.

I _hope_ that steps 1-3 only happen very rarely, and 4a even more rarely. It's hard to know

This once again begs the question of why we're using LevelDB (#483), when it clearly doesn't care about atomicity! A ham-fisted fix would be to wrap the hot and cold DBs in locks, which would bring us closer to how other DBs handle read-write transactions. E.g. [LMDB only allows one R/W transaction at a time](https://docs.rs/lmdb/0.8.0/lmdb/struct.Environment.html#method.begin_rw_txn).

### Race 2: Temporary state returned from `get_state`

I don't think this race really matters, but in `load_hot_state`, if another thread stores a state between when we call `load_state_temporary_flag` and when we call `load_hot_state_summary`, then we could end up returning that state even though it's only a temporary state. I can't think of any case where this would be relevant, and I suspect if it did come up, it would be safe/recoverable (having data is safer than _not_ having data).

This could be fixed by using a LevelDB read snapshot, but that would require substantial changes to how we read all our values, so I don't think it's worth it right now.
This commit is contained in:
Michael Sproul 2020-10-23 01:27:51 +00:00
parent 66f0cf4430
commit acd49d988d
14 changed files with 343 additions and 96 deletions

View File

@ -1498,7 +1498,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let block_root = fully_verified_block.block_root; let block_root = fully_verified_block.block_root;
let mut state = fully_verified_block.state; let mut state = fully_verified_block.state;
let current_slot = self.slot()?; let current_slot = self.slot()?;
let mut ops = fully_verified_block.intermediate_states; let mut ops = fully_verified_block.confirmation_db_batch;
let attestation_observation_timer = let attestation_observation_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION); metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION);
@ -1623,13 +1623,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
// Store all the states between the parent block state and this block's slot, the block and state. // Store the block and its state, and execute the confirmation batch for the intermediate
ops.push(StoreOp::PutBlock(block_root.into(), signed_block.clone())); // states, which will delete their temporary flags.
ops.push(StoreOp::PutState( ops.push(StoreOp::PutBlock(
block.state_root.into(), block_root,
Cow::Borrowed(&state), Box::new(signed_block.clone()),
)); ));
ops.push(StoreOp::PutState(block.state_root, &state));
let txn_lock = self.store.hot_db.begin_rw_transaction();
self.store.do_atomically(ops)?; self.store.do_atomically(ops)?;
drop(txn_lock);
// The fork choice write-lock is dropped *after* the on-disk database has been updated. // The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency. // This prevents inconsistency between the two at the expense of concurrency.

View File

@ -63,7 +63,7 @@ use std::borrow::Cow;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fs; use std::fs;
use std::io::Write; use std::io::Write;
use store::{Error as DBError, HotColdDB, HotStateSummary, StoreOp}; use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp};
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::{ use types::{
BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256, BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256,
@ -363,7 +363,7 @@ pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> {
pub block_root: Hash256, pub block_root: Hash256,
pub state: BeaconState<T::EthSpec>, pub state: BeaconState<T::EthSpec>,
pub parent_block: SignedBeaconBlock<T::EthSpec>, pub parent_block: SignedBeaconBlock<T::EthSpec>,
pub intermediate_states: Vec<StoreOp<'a, T::EthSpec>>, pub confirmation_db_batch: Vec<StoreOp<'a, T::EthSpec>>,
} }
/// Implemented on types that can be converted into a `FullyVerifiedBlock`. /// Implemented on types that can be converted into a `FullyVerifiedBlock`.
@ -676,9 +676,9 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE); let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE);
// Keep a batch of any states that were "skipped" (block-less) in between the parent state // Stage a batch of operations to be completed atomically if this block is imported
// slot and the block slot. These will be stored in the database. // successfully.
let mut intermediate_states: Vec<StoreOp<T::EthSpec>> = Vec::new(); let mut confirmation_db_batch = vec![];
// The block must have a higher slot than its parent. // The block must have a higher slot than its parent.
if block.slot() <= parent.beacon_state.slot { if block.slot() <= parent.beacon_state.slot {
@ -702,18 +702,36 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
// processing, but we get early access to it. // processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?; let state_root = state.update_tree_hash_cache()?;
let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 { // Store the state immediately, marking it as temporary, and staging the deletion
StoreOp::PutState( // of its temporary status as part of the larger atomic operation.
state_root.into(), let txn_lock = chain.store.hot_db.begin_rw_transaction();
Cow::Owned(state.clone_with(CloneConfig::committee_caches_only())), let state_already_exists =
) chain.store.load_hot_state_summary(&state_root)?.is_some();
let state_batch = if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
vec![]
} else {
vec![
if state.slot % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root, &state)
} else { } else {
StoreOp::PutStateSummary( StoreOp::PutStateSummary(
state_root.into(), state_root,
HotStateSummary::new(&state_root, &state)?, HotStateSummary::new(&state_root, &state)?,
) )
},
StoreOp::PutStateTemporaryFlag(state_root),
]
}; };
intermediate_states.push(op); chain.store.do_atomically(state_batch)?;
drop(txn_lock);
confirmation_db_batch.push(StoreOp::DeleteStateTemporaryFlag(state_root));
state_root state_root
}; };
@ -801,7 +819,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
block_root, block_root,
state, state,
parent_block: parent.beacon_block, parent_block: parent.beacon_block,
intermediate_states, confirmation_db_batch,
}) })
} }
} }

View File

@ -1,9 +1,6 @@
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use types::Checkpoint; use types::Checkpoint;
/// There is a 693 block skip in the current canonical Medalla chain, we use 700 to be safe.
pub const DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS: u64 = 700;
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ChainConfig { pub struct ChainConfig {
/// Maximum number of slots to skip when importing a consensus message (e.g., block, /// Maximum number of slots to skip when importing a consensus message (e.g., block,
@ -20,7 +17,7 @@ pub struct ChainConfig {
impl Default for ChainConfig { impl Default for ChainConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
import_max_skip_slots: Some(DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS), import_max_skip_slots: None,
weak_subjectivity_checkpoint: None, weak_subjectivity_checkpoint: None,
} }
} }

View File

@ -436,11 +436,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let batch: Vec<StoreOp<E>> = abandoned_blocks let batch: Vec<StoreOp<E>> = abandoned_blocks
.into_iter() .into_iter()
.map(Into::into)
.map(StoreOp::DeleteBlock) .map(StoreOp::DeleteBlock)
.chain( .chain(
abandoned_states abandoned_states
.into_iter() .into_iter()
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash, slot)), .map(|(slot, state_hash)| StoreOp::DeleteState(state_hash.into(), Some(slot))),
) )
.collect(); .collect();

View File

@ -730,7 +730,7 @@ where
} }
} }
fn set_current_slot(&self, slot: Slot) { pub fn set_current_slot(&self, slot: Slot) {
let current_slot = self.chain.slot().unwrap(); let current_slot = self.chain.slot().unwrap();
let current_epoch = current_slot.epoch(E::slots_per_epoch()); let current_epoch = current_slot.epoch(E::slots_per_epoch());
let epoch = slot.epoch(E::slots_per_epoch()); let epoch = slot.epoch(E::slots_per_epoch());

View File

@ -1614,6 +1614,44 @@ fn pruning_test(
check_no_blocks_exist(&harness, stray_blocks.values()); check_no_blocks_exist(&harness, stray_blocks.values());
} }
#[test]
fn garbage_collect_temp_states_from_failed_block() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let slots_per_epoch = E::slots_per_epoch();
let genesis_state = harness.get_current_state();
let block_slot = Slot::new(2 * slots_per_epoch);
let (mut block, state) = harness.make_block(genesis_state, block_slot);
// Mutate the block to make it invalid, and re-sign it.
block.message.state_root = Hash256::repeat_byte(0xff);
let proposer_index = block.message.proposer_index as usize;
let block = block.message.sign(
&harness.validator_keypairs[proposer_index].sk,
&state.fork,
state.genesis_validators_root,
&harness.spec,
);
// The block should be rejected, but should store a bunch of temporary states.
harness.set_current_slot(block_slot);
harness.process_block_result(block).unwrap_err();
assert_eq!(
store.iter_temporary_state_roots().count(),
block_slot.as_usize() - 1
);
drop(harness);
drop(store);
// On startup, the store should garbage collect all the temporary states.
let store = get_store(&db_path);
assert_eq!(store.iter_temporary_state_roots().count(), 0);
}
/// Check that the head state's slot matches `expected_slot`. /// Check that the head state's slot matches `expected_slot`.
fn check_slot(harness: &TestHarness, expected_slot: u64) { fn check_slot(harness: &TestHarness, expected_slot: u64) {
let state = &harness.chain.head().expect("should get head").beacon_state; let state = &harness.chain.head().expect("should get head").beacon_state;

View File

@ -315,13 +315,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.long("max-skip-slots") .long("max-skip-slots")
.help( .help(
"Refuse to skip more than this many slots when processing a block or attestation. \ "Refuse to skip more than this many slots when processing a block or attestation. \
This prevents nodes on minority forks from wasting our time and RAM, \ This prevents nodes on minority forks from wasting our time and disk space, \
but might need to be raised or set to 'none' in times of extreme network \ but could also cause unnecessary consensus failures, so is disabled by default."
outage."
) )
.value_name("NUM_SLOTS") .value_name("NUM_SLOTS")
.takes_value(true) .takes_value(true)
.default_value("700")
) )
.arg( .arg(
Arg::with_name("wss-checkpoint") Arg::with_name("wss-checkpoint")

View File

@ -0,0 +1,38 @@
//! Garbage collection process that runs at start-up to clean up the database.
use crate::hot_cold_store::HotColdDB;
use crate::{Error, LevelDB, StoreOp};
use slog::debug;
use types::EthSpec;
impl<E> HotColdDB<E, LevelDB<E>, LevelDB<E>>
where
E: EthSpec,
{
/// Clean up the database by performing one-off maintenance at start-up.
pub fn remove_garbage(&self) -> Result<(), Error> {
self.delete_temp_states()
}
/// Delete the temporary states that were leftover by failed block imports.
pub fn delete_temp_states(&self) -> Result<(), Error> {
let delete_ops =
self.iter_temporary_state_roots()
.try_fold(vec![], |mut ops, state_root| {
let state_root = state_root?;
ops.push(StoreOp::DeleteState(state_root, None));
ops.push(StoreOp::DeleteStateTemporaryFlag(state_root));
Result::<_, Error>::Ok(ops)
})?;
if !delete_ops.is_empty() {
debug!(
self.log,
"Garbage collecting {} temporary states",
delete_ops.len() / 2
);
self.do_atomically(delete_ops)?;
}
Ok(())
}
}

View File

@ -5,6 +5,7 @@ use crate::config::StoreConfig;
use crate::forwards_iter::HybridForwardsBlockRootsIterator; use crate::forwards_iter::HybridForwardsBlockRootsIterator;
use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB; use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore; use crate::memory_store::MemoryStore;
use crate::metadata::{ use crate::metadata::{
@ -15,6 +16,7 @@ use crate::{
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
StoreOp, StoreOp,
}; };
use leveldb::iterator::LevelDBIterator;
use lru::LruCache; use lru::LruCache;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use slog::{debug, error, info, trace, warn, Logger}; use slog::{debug, error, info, trace, warn, Logger};
@ -46,8 +48,6 @@ pub enum BlockReplay {
/// intermittent "restore point" states pre-finalization. /// intermittent "restore point" states pre-finalization.
#[derive(Debug)] #[derive(Debug)]
pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> { pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The schema version. Loaded from disk on initialization.
schema_version: SchemaVersion,
/// The slot and state root at the point where the database is split between hot and cold. /// The slot and state root at the point where the database is split between hot and cold.
/// ///
/// States with slots less than `split.slot` are in the cold DB, while states with slots /// States with slots less than `split.slot` are in the cold DB, while states with slots
@ -73,8 +73,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum HotColdDBError { pub enum HotColdDBError {
UnsupportedSchemaVersion { UnsupportedSchemaVersion {
software_version: SchemaVersion, target_version: SchemaVersion,
disk_version: SchemaVersion, current_version: SchemaVersion,
}, },
/// Recoverable error indicating that the database freeze point couldn't be updated /// Recoverable error indicating that the database freeze point couldn't be updated
/// due to the finalized block not lying on an epoch boundary (should be infrequent). /// due to the finalized block not lying on an epoch boundary (should be infrequent).
@ -101,6 +101,9 @@ pub enum HotColdDBError {
slots_per_epoch: u64, slots_per_epoch: u64,
}, },
RestorePointBlockHashError(BeaconStateError), RestorePointBlockHashError(BeaconStateError),
IterationError {
unexpected_key: BytesKey,
},
} }
impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> { impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
@ -112,7 +115,6 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB { let db = HotColdDB {
schema_version: CURRENT_SCHEMA_VERSION,
split: RwLock::new(Split::default()), split: RwLock::new(Split::default()),
cold_db: MemoryStore::open(), cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(), hot_db: MemoryStore::open(),
@ -141,7 +143,6 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB { let db = HotColdDB {
schema_version: CURRENT_SCHEMA_VERSION,
split: RwLock::new(Split::default()), split: RwLock::new(Split::default()),
cold_db: LevelDB::open(cold_path)?, cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?, hot_db: LevelDB::open(hot_path)?,
@ -153,15 +154,15 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
}; };
// Ensure that the schema version of the on-disk database matches the software. // Ensure that the schema version of the on-disk database matches the software.
// In the future, this would be the spot to hook in auto-migration, etc. // If the version is mismatched, an automatic migration will be attempted.
if let Some(schema_version) = db.load_schema_version()? { if let Some(schema_version) = db.load_schema_version()? {
if schema_version != CURRENT_SCHEMA_VERSION { debug!(
return Err(HotColdDBError::UnsupportedSchemaVersion { db.log,
software_version: CURRENT_SCHEMA_VERSION, "Attempting schema migration";
disk_version: schema_version, "from_version" => schema_version.as_u64(),
} "to_version" => CURRENT_SCHEMA_VERSION.as_u64(),
.into()); );
} db.migrate_schema(schema_version, CURRENT_SCHEMA_VERSION)?;
} else { } else {
db.store_schema_version(CURRENT_SCHEMA_VERSION)?; db.store_schema_version(CURRENT_SCHEMA_VERSION)?;
} }
@ -178,14 +179,41 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
info!( info!(
db.log, db.log,
"Hot-Cold DB initialized"; "Hot-Cold DB initialized";
"version" => db.schema_version.0, "version" => CURRENT_SCHEMA_VERSION.as_u64(),
"split_slot" => split.slot, "split_slot" => split.slot,
"split_state" => format!("{:?}", split.state_root) "split_state" => format!("{:?}", split.state_root)
); );
*db.split.write() = split; *db.split.write() = split;
} }
// Finally, run a garbage collection pass.
db.remove_garbage()?;
Ok(db) Ok(db)
} }
/// Return an iterator over the state roots of all temporary states.
pub fn iter_temporary_state_roots<'a>(
&'a self,
) -> impl Iterator<Item = Result<Hash256, Error>> + 'a {
let column = DBColumn::BeaconStateTemporary;
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
let keys_iter = self.hot_db.keys_iter();
keys_iter.seek(&start_key);
keys_iter
.take_while(move |key| key.matches_column(column))
.map(move |bytes_key| {
bytes_key.remove_column(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key,
}
.into()
})
})
}
} }
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> { impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
@ -391,39 +419,41 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut key_value_batch = Vec::with_capacity(batch.len()); let mut key_value_batch = Vec::with_capacity(batch.len());
for op in batch { for op in batch {
match op { match op {
StoreOp::PutBlock(block_hash, block) => { StoreOp::PutBlock(block_root, block) => {
let untyped_hash: Hash256 = (*block_hash).into(); key_value_batch.push(block.as_kv_store_op(*block_root));
key_value_batch.push(block.as_kv_store_op(untyped_hash));
} }
StoreOp::PutState(state_hash, state) => { StoreOp::PutState(state_root, state) => {
let untyped_hash: Hash256 = (*state_hash).into(); self.store_hot_state(state_root, state, &mut key_value_batch)?;
self.store_hot_state(&untyped_hash, state, &mut key_value_batch)?;
} }
StoreOp::PutStateSummary(state_hash, summary) => { StoreOp::PutStateSummary(state_root, summary) => {
let untyped_hash: Hash256 = (*state_hash).into(); key_value_batch.push(summary.as_kv_store_op(*state_root));
key_value_batch.push(summary.as_kv_store_op(untyped_hash));
} }
StoreOp::DeleteBlock(block_hash) => { StoreOp::PutStateTemporaryFlag(state_root) => {
let untyped_hash: Hash256 = (*block_hash).into(); key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root));
let key = }
get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes());
StoreOp::DeleteStateTemporaryFlag(state_root) => {
let db_key =
get_key_for_col(TemporaryFlag::db_column().into(), state_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(db_key));
}
StoreOp::DeleteBlock(block_root) => {
let key = get_key_for_col(DBColumn::BeaconBlock.into(), block_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
} }
StoreOp::DeleteState(state_hash, slot) => { StoreOp::DeleteState(state_root, slot) => {
let untyped_hash: Hash256 = (*state_hash).into(); let state_summary_key =
let state_summary_key = get_key_for_col( get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes());
DBColumn::BeaconStateSummary.into(),
untyped_hash.as_bytes(),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key));
if *slot % E::slots_per_epoch() == 0 { if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) {
let state_key = let state_key =
get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes()); get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key));
} }
} }
@ -440,18 +470,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in &batch { for op in &batch {
match op { match op {
StoreOp::PutBlock(block_hash, block) => { StoreOp::PutBlock(block_root, block) => {
let untyped_hash: Hash256 = (*block_hash).into(); guard.put(*block_root, (**block).clone());
guard.put(untyped_hash, block.clone());
} }
StoreOp::PutState(_, _) => (), StoreOp::PutState(_, _) => (),
StoreOp::PutStateSummary(_, _) => (), StoreOp::PutStateSummary(_, _) => (),
StoreOp::DeleteBlock(block_hash) => { StoreOp::PutStateTemporaryFlag(_) => (),
let untyped_hash: Hash256 = (*block_hash).into();
guard.pop(&untyped_hash); StoreOp::DeleteStateTemporaryFlag(_) => (),
StoreOp::DeleteBlock(block_root) => {
guard.pop(block_root);
} }
StoreOp::DeleteState(_, _) => (), StoreOp::DeleteState(_, _) => (),
@ -500,6 +532,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<Option<BeaconState<E>>, Error> { ) -> Result<Option<BeaconState<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
// If the state is marked as temporary, do not return it. It will become visible
// only once its transaction commits and deletes its temporary flag.
if self.load_state_temporary_flag(state_root)?.is_some() {
return Ok(None);
}
if let Some(HotStateSummary { if let Some(HotStateSummary {
slot, slot,
latest_block_root, latest_block_root,
@ -785,7 +823,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
/// Store the database schema version. /// Store the database schema version.
fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> { pub(crate) fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> {
self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version) self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version)
} }
@ -846,6 +884,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db.get(state_root) self.hot_db.get(state_root)
} }
/// Load the temporary flag for a state root, if one exists.
///
/// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not
/// exist -- you should call `load_hot_state_summary` to find out which.
pub fn load_state_temporary_flag(
&self,
state_root: &Hash256,
) -> Result<Option<TemporaryFlag>, Error> {
self.hot_db.get(state_root)
}
/// Check that the restore point frequency is valid. /// Check that the restore point frequency is valid.
/// ///
/// Specifically, check that it is: /// Specifically, check that it is:
@ -937,7 +986,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store.cold_db.do_atomically(cold_db_ops)?; store.cold_db.do_atomically(cold_db_ops)?;
// Delete the old summary, and the full state if we lie on an epoch boundary. // Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root.into(), slot)); hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
} }
// Warning: Critical section. We have to take care not to put any of the two databases in an // Warning: Critical section. We have to take care not to put any of the two databases in an
@ -1107,3 +1156,20 @@ impl StoreItem for RestorePointHash {
Ok(Self::from_ssz_bytes(bytes)?) Ok(Self::from_ssz_bytes(bytes)?)
} }
} }
#[derive(Debug, Clone, Copy, Default)]
pub struct TemporaryFlag;
impl StoreItem for TemporaryFlag {
fn db_column() -> DBColumn {
DBColumn::BeaconStateTemporary
}
fn as_store_bytes(&self) -> Vec<u8> {
vec![]
}
fn from_store_bytes(_: &[u8]) -> Result<Self, Error> {
Ok(TemporaryFlag)
}
}

View File

@ -5,13 +5,17 @@ use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV; use leveldb::database::kv::KV;
use leveldb::database::Database; use leveldb::database::Database;
use leveldb::error::Error as LevelDBError; use leveldb::error::Error as LevelDBError;
use leveldb::iterator::{Iterable, KeyIterator};
use leveldb::options::{Options, ReadOptions, WriteOptions}; use leveldb::options::{Options, ReadOptions, WriteOptions};
use parking_lot::{Mutex, MutexGuard};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::path::Path; use std::path::Path;
/// A wrapped leveldb database. /// A wrapped leveldb database.
pub struct LevelDB<E: EthSpec> { pub struct LevelDB<E: EthSpec> {
db: Database<BytesKey>, db: Database<BytesKey>,
/// A mutex to synchronise sensitive read-write transactions.
transaction_mutex: Mutex<()>,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
@ -23,9 +27,11 @@ impl<E: EthSpec> LevelDB<E> {
options.create_if_missing = true; options.create_if_missing = true;
let db = Database::open(path, options)?; let db = Database::open(path, options)?;
let transaction_mutex = Mutex::new(());
Ok(Self { Ok(Self {
db, db,
transaction_mutex,
_phantom: PhantomData, _phantom: PhantomData,
}) })
} }
@ -64,6 +70,10 @@ impl<E: EthSpec> LevelDB<E> {
metrics::stop_timer(timer); metrics::stop_timer(timer);
}) })
} }
pub fn keys_iter(&self) -> KeyIterator<BytesKey> {
self.db.keys_iter(self.read_options())
}
} }
impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> { impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
@ -138,11 +148,16 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
self.db.write(self.write_options(), &leveldb_batch)?; self.db.write(self.write_options(), &leveldb_batch)?;
Ok(()) Ok(())
} }
fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock()
}
} }
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {} impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}
/// Used for keying leveldb. /// Used for keying leveldb.
#[derive(Debug, PartialEq)]
pub struct BytesKey { pub struct BytesKey {
key: Vec<u8>, key: Vec<u8>,
} }
@ -158,7 +173,23 @@ impl Key for BytesKey {
} }
impl BytesKey { impl BytesKey {
fn from_vec(key: Vec<u8>) -> Self { /// Return `true` iff this `BytesKey` was created with the given `column`.
pub fn matches_column(&self, column: DBColumn) -> bool {
self.key.starts_with(column.as_bytes())
}
/// Remove the column from a key, returning its `Hash256` portion.
pub fn remove_column(&self, column: DBColumn) -> Option<Hash256> {
if self.matches_column(column) {
let subkey = &self.key[column.as_bytes().len()..];
if subkey.len() == 32 {
return Some(Hash256::from_slice(subkey));
}
}
None
}
pub fn from_vec(key: Vec<u8>) -> Self {
Self { key } Self { key }
} }
} }

View File

@ -15,6 +15,7 @@ pub mod chunked_vector;
pub mod config; pub mod config;
pub mod errors; pub mod errors;
mod forwards_iter; mod forwards_iter;
mod garbage_collection;
pub mod hot_cold_store; pub mod hot_cold_store;
mod impls; mod impls;
mod leveldb_store; mod leveldb_store;
@ -22,11 +23,10 @@ mod memory_store;
mod metadata; mod metadata;
mod metrics; mod metrics;
mod partial_beacon_state; mod partial_beacon_state;
mod schema_change;
pub mod iter; pub mod iter;
use std::borrow::Cow;
pub use self::config::StoreConfig; pub use self::config::StoreConfig;
pub use self::hot_cold_store::{BlockReplay, HotColdDB, HotStateSummary, Split}; pub use self::hot_cold_store::{BlockReplay, HotColdDB, HotStateSummary, Split};
pub use self::leveldb_store::LevelDB; pub use self::leveldb_store::LevelDB;
@ -35,6 +35,7 @@ pub use self::partial_beacon_state::PartialBeaconState;
pub use errors::Error; pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metrics::scrape_for_metrics; pub use metrics::scrape_for_metrics;
use parking_lot::MutexGuard;
pub use types::*; pub use types::*;
pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static { pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
@ -60,6 +61,12 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// Execute either all of the operations in `batch` or none at all, returning an error. /// Execute either all of the operations in `batch` or none at all, returning an error.
fn do_atomically(&self, batch: Vec<KeyValueStoreOp>) -> Result<(), Error>; fn do_atomically(&self, batch: Vec<KeyValueStoreOp>) -> Result<(), Error>;
/// Return a mutex guard that can be used to synchronize sensitive transactions.
///
/// This doesn't prevent other threads writing to the DB unless they also use
/// this method. In future we may implement a safer mandatory locking scheme.
fn begin_rw_transaction(&self) -> MutexGuard<()>;
} }
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> { pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
@ -121,13 +128,14 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
/// Reified key-value storage operation. Helps in modifying the storage atomically. /// Reified key-value storage operation. Helps in modifying the storage atomically.
/// See also https://github.com/sigp/lighthouse/issues/692 /// See also https://github.com/sigp/lighthouse/issues/692
#[allow(clippy::large_enum_variant)]
pub enum StoreOp<'a, E: EthSpec> { pub enum StoreOp<'a, E: EthSpec> {
PutBlock(SignedBeaconBlockHash, SignedBeaconBlock<E>), PutBlock(Hash256, Box<SignedBeaconBlock<E>>),
PutState(BeaconStateHash, Cow<'a, BeaconState<E>>), PutState(Hash256, &'a BeaconState<E>),
PutStateSummary(BeaconStateHash, HotStateSummary), PutStateSummary(Hash256, HotStateSummary),
DeleteBlock(SignedBeaconBlockHash), PutStateTemporaryFlag(Hash256),
DeleteState(BeaconStateHash, Slot), DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
DeleteState(Hash256, Option<Slot>),
} }
/// A unique column identifier. /// A unique column identifier.
@ -146,6 +154,9 @@ pub enum DBColumn {
BeaconRestorePoint, BeaconRestorePoint,
/// For the mapping from state roots to their slots or summaries. /// For the mapping from state roots to their slots or summaries.
BeaconStateSummary, BeaconStateSummary,
/// For the list of temporary states stored during block import,
/// and then made non-temporary by the deletion of their state root from this column.
BeaconStateTemporary,
BeaconBlockRoots, BeaconBlockRoots,
BeaconStateRoots, BeaconStateRoots,
BeaconHistoricalRoots, BeaconHistoricalRoots,
@ -166,6 +177,7 @@ impl Into<&'static str> for DBColumn {
DBColumn::ForkChoice => "frk", DBColumn::ForkChoice => "frk",
DBColumn::BeaconRestorePoint => "brp", DBColumn::BeaconRestorePoint => "brp",
DBColumn::BeaconStateSummary => "bss", DBColumn::BeaconStateSummary => "bss",
DBColumn::BeaconStateTemporary => "bst",
DBColumn::BeaconBlockRoots => "bbr", DBColumn::BeaconBlockRoots => "bbr",
DBColumn::BeaconStateRoots => "bsr", DBColumn::BeaconStateRoots => "bsr",
DBColumn::BeaconHistoricalRoots => "bhr", DBColumn::BeaconHistoricalRoots => "bhr",
@ -175,6 +187,16 @@ impl Into<&'static str> for DBColumn {
} }
} }
impl DBColumn {
pub fn as_str(self) -> &'static str {
self.into()
}
pub fn as_bytes(self) -> &'static [u8] {
self.as_str().as_bytes()
}
}
/// An item that may stored in a `Store` by serializing and deserializing from bytes. /// An item that may stored in a `Store` by serializing and deserializing from bytes.
pub trait StoreItem: Sized { pub trait StoreItem: Sized {
/// Identifies which column this item should be placed in. /// Identifies which column this item should be placed in.

View File

@ -1,5 +1,5 @@
use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp};
use parking_lot::RwLock; use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData; use std::marker::PhantomData;
use types::*; use types::*;
@ -9,23 +9,16 @@ type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
/// A thread-safe `HashMap` wrapper. /// A thread-safe `HashMap` wrapper.
pub struct MemoryStore<E: EthSpec> { pub struct MemoryStore<E: EthSpec> {
db: RwLock<DBHashMap>, db: RwLock<DBHashMap>,
transaction_mutex: Mutex<()>,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
impl<E: EthSpec> Clone for MemoryStore<E> {
fn clone(&self) -> Self {
Self {
db: RwLock::new(self.db.read().clone()),
_phantom: PhantomData,
}
}
}
impl<E: EthSpec> MemoryStore<E> { impl<E: EthSpec> MemoryStore<E> {
/// Create a new, empty database. /// Create a new, empty database.
pub fn open() -> Self { pub fn open() -> Self {
Self { Self {
db: RwLock::new(HashMap::new()), db: RwLock::new(HashMap::new()),
transaction_mutex: Mutex::new(()),
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@ -87,6 +80,10 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
} }
Ok(()) Ok(())
} }
fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock()
}
} }
impl<E: EthSpec> ItemStore<E> for MemoryStore<E> {} impl<E: EthSpec> ItemStore<E> for MemoryStore<E> {}

View File

@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use types::Hash256; use types::Hash256;
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(1); pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2);
// All the keys that get stored under the `BeaconMeta` column. // All the keys that get stored under the `BeaconMeta` column.
// //
@ -14,6 +14,12 @@ pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64); pub struct SchemaVersion(pub u64);
impl SchemaVersion {
pub fn as_u64(self) -> u64 {
self.0
}
}
impl StoreItem for SchemaVersion { impl StoreItem for SchemaVersion {
fn db_column() -> DBColumn { fn db_column() -> DBColumn {
DBColumn::BeaconMeta DBColumn::BeaconMeta

View File

@ -0,0 +1,32 @@
//! Utilities for managing database schema changes.
use crate::hot_cold_store::{HotColdDB, HotColdDBError};
use crate::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use crate::{Error, ItemStore};
use types::EthSpec;
impl<E, Hot, Cold> HotColdDB<E, Hot, Cold>
where
E: EthSpec,
Hot: ItemStore<E>,
Cold: ItemStore<E>,
{
/// Migrate the database from one schema version to another, applying all requisite mutations.
pub fn migrate_schema(&self, from: SchemaVersion, to: SchemaVersion) -> Result<(), Error> {
match (from, to) {
// Migration from v0.3.0 to v0.3.x, adding the temporary states column.
// Nothing actually needs to be done, but once a DB uses v2 it shouldn't go back.
(SchemaVersion(1), SchemaVersion(2)) => {
self.store_schema_version(to)?;
Ok(())
}
// Migrating from the current schema version to iself is always OK, a no-op.
(_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()),
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
current_version: from,
}
.into()),
}
}
}