Add block roots heal logic in v18 schema migration. (#4875)

## Issue Addressed

Fixes #4697. 

This also unblocks the state pruning PR (#4835). Because self healing breaks if state pruning is applied to a database with missing block roots.

## Proposed Changes

- Fill in the missing block roots between last restore point slot and split slot when upgrading to latest database version.
This commit is contained in:
Jimmy Chen 2023-10-25 03:42:24 +00:00
parent a228e61773
commit d4f26ee123
3 changed files with 210 additions and 1 deletions

View File

@ -51,6 +51,9 @@ pub fn upgrade_to_v18<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
db.heal_freezer_block_roots()?;
info!(log, "Healed freezer block roots");
// No-op, even if Deneb has already occurred. The database is probably borked in this case, but
// *maybe* the fork recovery will revert the minority fork and succeed.
if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch {

View File

@ -29,8 +29,10 @@ use std::sync::Arc;
use std::time::Duration;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::{
chunked_vector::{chunk_key, Field},
get_key_for_col,
iter::{BlockRootsIterator, StateRootsIterator},
HotColdDB, LevelDB, StoreConfig,
DBColumn, HotColdDB, KeyValueStore, KeyValueStoreOp, LevelDB, StoreConfig,
};
use tempfile::{tempdir, TempDir};
use tokio::time::sleep;
@ -104,6 +106,181 @@ fn get_harness_generic(
harness
}
/// Tests that `store.heal_freezer_block_roots` inserts block roots between last restore point
/// slot and the split slot.
#[tokio::test]
async fn heal_freezer_block_roots() {
// chunk_size is hard-coded to 128
let num_blocks_produced = E::slots_per_epoch() * 20;
let db_path = tempdir().unwrap();
let store = get_store_generic(
&db_path,
StoreConfig {
slots_per_restore_point: 2 * E::slots_per_epoch(),
..Default::default()
},
test_spec::<E>(),
);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let split_slot = store.get_split_slot();
assert_eq!(split_slot, 18 * E::slots_per_epoch());
// Do a heal before deleting to make sure that it doesn't break.
let last_restore_point_slot = Slot::new(16 * E::slots_per_epoch());
store.heal_freezer_block_roots().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);
// Delete block roots between `last_restore_point_slot` and `split_slot`.
let chunk_index = <store::chunked_vector::BlockRoots as Field<E>>::chunk_index(
last_restore_point_slot.as_usize(),
);
let key_chunk = get_key_for_col(DBColumn::BeaconBlockRoots.as_str(), &chunk_key(chunk_index));
store
.cold_db
.do_atomically(vec![KeyValueStoreOp::DeleteKey(key_chunk)])
.unwrap();
let block_root_err = store
.forwards_block_roots_iterator_until(
last_restore_point_slot,
last_restore_point_slot + 1,
|| unreachable!(),
&harness.chain.spec,
)
.unwrap()
.next()
.unwrap()
.unwrap_err();
assert!(matches!(block_root_err, store::Error::NoContinuationData));
// Re-insert block roots
store.heal_freezer_block_roots().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);
// Run for another two epochs to check that the invariant is maintained.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;
check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
check_chain_dump(
&harness,
num_blocks_produced + additional_blocks_produced + 1,
);
check_iterators(&harness);
}
/// Tests that `store.heal_freezer_block_roots` inserts block roots between last restore point
/// slot and the split slot.
#[tokio::test]
async fn heal_freezer_block_roots_with_skip_slots() {
// chunk_size is hard-coded to 128
let num_blocks_produced = E::slots_per_epoch() * 20;
let db_path = tempdir().unwrap();
let store = get_store_generic(
&db_path,
StoreConfig {
slots_per_restore_point: 2 * E::slots_per_epoch(),
..Default::default()
},
test_spec::<E>(),
);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let current_state = harness.get_current_state();
let state_root = harness.get_current_state().tree_hash_root();
let all_validators = &harness.get_all_validators();
harness
.add_attested_blocks_at_slots(
current_state,
state_root,
&(1..=num_blocks_produced)
.filter(|i| i % 12 != 0)
.map(Slot::new)
.collect::<Vec<_>>(),
all_validators,
)
.await;
// split slot should be 18 here
let split_slot = store.get_split_slot();
assert_eq!(split_slot, 18 * E::slots_per_epoch());
let last_restore_point_slot = Slot::new(16 * E::slots_per_epoch());
let chunk_index = <store::chunked_vector::BlockRoots as Field<E>>::chunk_index(
last_restore_point_slot.as_usize(),
);
let key_chunk = get_key_for_col(DBColumn::BeaconBlockRoots.as_str(), &chunk_key(chunk_index));
store
.cold_db
.do_atomically(vec![KeyValueStoreOp::DeleteKey(key_chunk)])
.unwrap();
let block_root_err = store
.forwards_block_roots_iterator_until(
last_restore_point_slot,
last_restore_point_slot + 1,
|| unreachable!(),
&harness.chain.spec,
)
.unwrap()
.next()
.unwrap()
.unwrap_err();
assert!(matches!(block_root_err, store::Error::NoContinuationData));
// heal function
store.heal_freezer_block_roots().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);
// Run for another two epochs to check that the invariant is maintained.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;
check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
check_iterators(&harness);
}
fn check_freezer_block_roots(
harness: &TestHarness,
last_restore_point_slot: Slot,
split_slot: Slot,
) {
for slot in (last_restore_point_slot.as_u64()..split_slot.as_u64()).map(Slot::new) {
let (block_root, result_slot) = harness
.chain
.store
.forwards_block_roots_iterator_until(slot, slot, || unreachable!(), &harness.chain.spec)
.unwrap()
.next()
.unwrap()
.unwrap();
assert_eq!(slot, result_slot);
let expected_block_root = harness
.chain
.block_root_at_slot(slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
assert_eq!(expected_block_root, block_root);
}
}
#[tokio::test]
async fn full_participation_no_skips() {
let num_blocks_produced = E::slots_per_epoch() * 5;

View File

@ -2218,6 +2218,35 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}
pub fn heal_freezer_block_roots(&self) -> Result<(), Error> {
let split = self.get_split_info();
let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point
* self.config.slots_per_restore_point;
// Load split state (which has access to block roots).
let (_, split_state) = self
.get_advanced_hot_state(split.block_root, split.slot, split.state_root)?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))?;
let mut batch = vec![];
let mut chunk_writer = ChunkWriter::<BlockRoots, _, _>::new(
&self.cold_db,
last_restore_point_slot.as_usize(),
)?;
for slot in (last_restore_point_slot.as_u64()..split.slot.as_u64()).map(Slot::new) {
let block_root = *split_state.get_block_root(slot)?;
chunk_writer.set(slot.as_usize(), block_root, &mut batch)?;
}
chunk_writer.write(&mut batch)?;
self.cold_db.do_atomically(batch)?;
Ok(())
}
}
/// Advance the split point of the store, moving new finalized states to the freezer.