Add methods to delete blocks and states from disk (#843)

Closes #833
This commit is contained in:
Michael Sproul 2020-03-04 16:48:35 +11:00 committed by GitHub
parent 12d9b42188
commit 1f16d8fe4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 142 additions and 9 deletions

View File

@ -10,7 +10,10 @@ use beacon_chain::AttestationProcessingOutcome;
use rand::Rng;
use sloggers::{null::NullLoggerBuilder, Build};
use std::sync::Arc;
use store::{DiskStore, Store, StoreConfig};
use store::{
iter::{BlockRootsIterator, StateRootsIterator},
DiskStore, Store, StoreConfig,
};
use tempfile::{tempdir, TempDir};
use tree_hash::TreeHash;
use types::test_utils::{SeedableRng, XorShiftRng};
@ -320,6 +323,102 @@ fn epoch_boundary_state_attestation_processing() {
assert!(checked_pre_fin);
}
#[test]
fn delete_blocks_and_states() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), VALIDATOR_COUNT);
let unforked_blocks = 4 * E::slots_per_epoch();
// Finalize an initial portion of the chain.
harness.extend_chain(
unforked_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
// Create a fork post-finalization.
let two_thirds = (VALIDATOR_COUNT / 3) * 2;
let honest_validators: Vec<usize> = (0..two_thirds).collect();
let faulty_validators: Vec<usize> = (two_thirds..VALIDATOR_COUNT).collect();
// NOTE: should remove this -1 and/or write a similar test once #845 is resolved
// https://github.com/sigp/lighthouse/issues/845
let fork_blocks = 2 * E::slots_per_epoch() - 1;
let (honest_head, faulty_head) = harness.generate_two_forks_by_skipping_a_block(
&honest_validators,
&faulty_validators,
fork_blocks as usize,
fork_blocks as usize,
);
assert!(honest_head != faulty_head, "forks should be distinct");
let head_info = harness.chain.head_info().expect("should get head");
assert_eq!(head_info.slot, unforked_blocks + fork_blocks);
assert_eq!(
head_info.block_root, honest_head,
"the honest chain should be the canonical chain",
);
let faulty_head_block = store
.get_block(&faulty_head)
.expect("no errors")
.expect("faulty head block exists");
let faulty_head_state = store
.get_state(
&faulty_head_block.state_root(),
Some(faulty_head_block.slot()),
)
.expect("no db error")
.expect("faulty head state exists");
let states_to_delete = StateRootsIterator::new(store.clone(), &faulty_head_state)
.take_while(|(_, slot)| *slot > unforked_blocks)
.collect::<Vec<_>>();
// Delete faulty fork
// Attempting to load those states should find them unavailable
for (state_root, slot) in &states_to_delete {
assert_eq!(store.delete_state(state_root, *slot), Ok(()));
assert_eq!(store.get_state(state_root, Some(*slot)), Ok(None));
}
// Double-deleting should also be OK (deleting non-existent things is fine)
for (state_root, slot) in &states_to_delete {
assert_eq!(store.delete_state(state_root, *slot), Ok(()));
}
// Deleting the blocks from the fork should remove them completely
let blocks_to_delete = BlockRootsIterator::new(store.clone(), &faulty_head_state)
// Extra +1 here accounts for the skipped slot that started this fork
.take_while(|(_, slot)| *slot > unforked_blocks + 1)
.collect::<Vec<_>>();
for (block_root, _) in blocks_to_delete {
assert_eq!(store.delete_block(&block_root), Ok(()));
assert_eq!(store.get_block(&block_root), Ok(None));
}
// Deleting frozen states should do nothing
let split_slot = store.get_split_slot();
let finalized_states = harness
.chain
.rev_iter_state_roots()
.expect("rev iter ok")
.filter(|(_, slot)| *slot < split_slot);
for (state_root, slot) in finalized_states {
assert_eq!(store.delete_state(&state_root, slot), Ok(()));
}
// After all that, the chain dump should still be OK
check_chain_dump(&harness, unforked_blocks + fork_blocks + 1);
}
/// Check that the head state's slot matches `expected_slot`.
fn check_slot(harness: &TestHarness, expected_slot: u64) {
let state = &harness.chain.head().expect("should get head").beacon_state;

View File

@ -138,6 +138,12 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
}
/// Delete a block from the store and the block cache.
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
self.block_cache.lock().pop(block_root);
self.delete::<SignedBeaconBlock<E>>(block_root)
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error> {
if state.slot < self.get_split_slot() {
@ -181,6 +187,29 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
}
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
///
/// It is assumed that all states being deleted reside in the hot DB, even if their slot is less
/// than the split point. You shouldn't delete states from the finalized portion of the chain
/// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint
/// (which will be deleted by this function but shouldn't be).
fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
// Delete the state summary.
self.hot_db
.key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?;
// Delete the full state if it lies on an epoch boundary.
if slot % E::slots_per_epoch() == 0 {
self.hot_db
.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?;
}
// Delete from the cache.
self.state_cache.lock().pop(state_root);
Ok(())
}
/// Advance the split point of the store, moving new finalized states to the freezer.
fn freeze_to_state(
store: Arc<Self>,
@ -232,10 +261,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
store.store_cold_state_slot(&state_root, slot)?;
// Delete the old summary, and the full state if we lie on an epoch boundary.
to_delete.push((DBColumn::BeaconStateSummary, state_root));
if slot % E::slots_per_epoch() == 0 {
to_delete.push((DBColumn::BeaconState, state_root));
}
to_delete.push((state_root, slot));
}
// 2. Update the split slot
@ -246,10 +272,8 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
store.store_split()?;
// 3. Delete from the hot DB
for (column, state_root) in to_delete {
store
.hot_db
.key_delete(column.into(), state_root.as_bytes())?;
for (state_root, slot) in to_delete {
store.delete_state(&state_root, slot)?;
}
debug!(

View File

@ -91,6 +91,11 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
self.get(block_root)
}
/// Delete a block from the store.
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
self.delete::<SignedBeaconBlock<E>>(block_root)
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error>;
@ -123,6 +128,11 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
self.get_state(state_root, slot)
}
/// Delete a state from the store.
fn delete_state(&self, state_root: &Hash256, _slot: Slot) -> Result<(), Error> {
self.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())
}
/// (Optionally) Move all data before the frozen slot to the freezer database.
fn freeze_to_state(
_store: Arc<Self>,