Fix bug in database pruning (#1564)

## Issue Addressed

Closes #1488

## Proposed Changes

* Prevent the pruning algorithm from over-eagerly deleting states at skipped slots when they are shared with the canonical chain.
* Add `debug` logging to the pruning algorithm so we have so better chance of debugging future issues from logs.
* Modify the handling of the "finalized state" in the beacon chain, so that it's always the state at the first slot of the finalized epoch (previously it was the state at the finalized block). This gives database pruning a clearer and cleaner view of things, and will marginally impact the pruning of the op pool, observed proposers, etc (in ways that are safe as far as I can tell).
* Remove duplicated `RevertedFinalizedEpoch` check from `after_finalization`
* Delete useless and unused `max_finality_distance`
* Add tests that exercise pruning with shared states at skip slots
* Delete unnecessary `block_strategy` argument from `add_blocks` and friends in the test harness (will likely conflict with #1380 slightly, sorry @adaszko -- but we can fix that)
* Bonus: add a `BeaconChain::with_head` method. I didn't end up needing it, but it turned out quite nice, so I figured we could keep it?

## Additional Info

Any users who have experienced pruning errors on Medalla will need to resync after upgrading to a release including this change. This should end unbounded `chain_db` growth! 🎉
This commit is contained in:
Michael Sproul 2020-08-26 00:01:06 +00:00
parent 175471a64b
commit 4763f03dcc
7 changed files with 563 additions and 273 deletions

12
Cargo.lock generated
View File

@ -2,7 +2,7 @@
# It is not intended for manual editing.
[[package]]
name = "account_manager"
version = "0.2.6"
version = "0.2.7"
dependencies = [
"account_utils",
"bls",
@ -373,7 +373,7 @@ dependencies = [
[[package]]
name = "beacon_node"
version = "0.2.6"
version = "0.2.7"
dependencies = [
"beacon_chain",
"clap",
@ -530,7 +530,7 @@ dependencies = [
[[package]]
name = "boot_node"
version = "0.2.6"
version = "0.2.7"
dependencies = [
"beacon_node",
"clap",
@ -2537,7 +2537,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lcli"
version = "0.2.6"
version = "0.2.7"
dependencies = [
"bls",
"clap",
@ -2894,7 +2894,7 @@ dependencies = [
[[package]]
name = "lighthouse"
version = "0.2.6"
version = "0.2.7"
dependencies = [
"account_manager",
"account_utils",
@ -6036,7 +6036,7 @@ dependencies = [
[[package]]
name = "validator_client"
version = "0.2.6"
version = "0.2.7"
dependencies = [
"account_utils",
"bls",

View File

@ -426,6 +426,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(iter)
}
/// As for `rev_iter_state_roots` but starting from an arbitrary `BeaconState`.
pub fn rev_iter_state_roots_from<'a>(
&self,
state_root: Hash256,
state: &'a BeaconState<T::EthSpec>,
) -> impl Iterator<Item = Result<(Hash256, Slot), Error>> + 'a {
std::iter::once(Ok((state_root, state.slot)))
.chain(StateRootsIterator::new(self.store.clone(), state))
.map(|result| result.map_err(Into::into))
}
/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
///
/// ## Errors
@ -479,30 +490,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// is the state as it was when the head block was received, which could be some slots prior to
/// now.
pub fn head(&self) -> Result<BeaconSnapshot<T::EthSpec>, Error> {
self.canonical_head
self.with_head(|head| Ok(head.clone_with_only_committee_caches()))
}
/// Apply a function to the canonical head without cloning it.
pub fn with_head<U>(
&self,
f: impl FnOnce(&BeaconSnapshot<T::EthSpec>) -> Result<U, Error>,
) -> Result<U, Error> {
let head_lock = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)
.map(|v| v.clone_with_only_committee_caches())
.ok_or_else(|| Error::CanonicalHeadLockTimeout)?;
f(&head_lock)
}
/// Returns info representing the head block and state.
///
/// A summarized version of `Self::head` that involves less cloning.
pub fn head_info(&self) -> Result<HeadInfo, Error> {
let head = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)?;
Ok(HeadInfo {
slot: head.beacon_block.slot(),
block_root: head.beacon_block_root,
state_root: head.beacon_state_root,
current_justified_checkpoint: head.beacon_state.current_justified_checkpoint,
finalized_checkpoint: head.beacon_state.finalized_checkpoint,
fork: head.beacon_state.fork,
genesis_time: head.beacon_state.genesis_time,
genesis_validators_root: head.beacon_state.genesis_validators_root,
self.with_head(|head| {
Ok(HeadInfo {
slot: head.beacon_block.slot(),
block_root: head.beacon_block_root,
state_root: head.beacon_state_root,
current_justified_checkpoint: head.beacon_state.current_justified_checkpoint,
finalized_checkpoint: head.beacon_state.finalized_checkpoint,
fork: head.beacon_state.fork,
genesis_time: head.beacon_state.genesis_time,
genesis_validators_root: head.beacon_state.genesis_validators_root,
})
})
}
@ -1746,7 +1763,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let beacon_block_root = self.fork_choice.write().get_head(self.slot()?)?;
let current_head = self.head_info()?;
let old_finalized_root = current_head.finalized_checkpoint.root;
let old_finalized_checkpoint = current_head.finalized_checkpoint;
if beacon_block_root == current_head.block_root {
return Ok(());
@ -1826,15 +1843,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
};
let old_finalized_epoch = current_head.finalized_checkpoint.epoch;
let new_finalized_epoch = new_head.beacon_state.finalized_checkpoint.epoch;
let finalized_root = new_head.beacon_state.finalized_checkpoint.root;
let new_finalized_checkpoint = new_head.beacon_state.finalized_checkpoint;
// State root of the finalized state on the epoch boundary, NOT the state
// of the finalized block. We need to use an iterator in case the state is beyond
// the reach of the new head's `state_roots` array.
let new_finalized_slot = new_finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
let new_finalized_state_root = process_results(
StateRootsIterator::new(self.store.clone(), &new_head.beacon_state),
|mut iter| {
iter.find_map(|(state_root, slot)| {
if slot == new_finalized_slot {
Some(state_root)
} else {
None
}
})
},
)?
.ok_or_else(|| Error::MissingFinalizedStateRoot(new_finalized_slot))?;
// It is an error to try to update to a head with a lesser finalized epoch.
if new_finalized_epoch < old_finalized_epoch {
if new_finalized_checkpoint.epoch < old_finalized_checkpoint.epoch {
return Err(Error::RevertedFinalizedEpoch {
previous_epoch: old_finalized_epoch,
new_epoch: new_finalized_epoch,
previous_epoch: old_finalized_checkpoint.epoch,
new_epoch: new_finalized_checkpoint.epoch,
});
}
@ -1873,11 +1907,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
});
if new_finalized_epoch != old_finalized_epoch {
if new_finalized_checkpoint.epoch != old_finalized_checkpoint.epoch {
self.after_finalization(
old_finalized_epoch,
finalized_root,
old_finalized_root.into(),
old_finalized_checkpoint,
new_finalized_checkpoint,
new_finalized_state_root,
)?;
}
@ -1905,68 +1939,53 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Performs pruning and finality-based optimizations.
fn after_finalization(
&self,
old_finalized_epoch: Epoch,
finalized_block_root: Hash256,
old_finalized_root: SignedBeaconBlockHash,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
new_finalized_state_root: Hash256,
) -> Result<(), Error> {
let finalized_block = self
.store
.get_block(&finalized_block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(finalized_block_root))?
.message;
self.fork_choice.write().prune()?;
let new_finalized_epoch = finalized_block.slot.epoch(T::EthSpec::slots_per_epoch());
self.observed_block_producers.prune(
new_finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch()),
);
if new_finalized_epoch < old_finalized_epoch {
Err(Error::RevertedFinalizedEpoch {
previous_epoch: old_finalized_epoch,
new_epoch: new_finalized_epoch,
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| {
snapshot_cache.prune(new_finalized_checkpoint.epoch);
})
} else {
self.fork_choice.write().prune()?;
self.observed_block_producers
.prune(new_finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()));
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| {
snapshot_cache.prune(new_finalized_epoch);
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "snapshot_cache",
"task" => "prune"
);
});
let finalized_state = self
.get_state(&finalized_block.state_root, Some(finalized_block.slot))?
.ok_or_else(|| Error::MissingBeaconState(finalized_block.state_root))?;
self.op_pool
.prune_all(&finalized_state, self.head_info()?.fork);
// TODO: configurable max finality distance
let max_finality_distance = 0;
self.store_migrator.process_finalization(
finalized_block.state_root,
finalized_state,
max_finality_distance,
Arc::clone(&self.head_tracker),
old_finalized_root,
finalized_block_root.into(),
);
let _ = self.event_handler.register(EventKind::BeaconFinalization {
epoch: new_finalized_epoch,
root: finalized_block_root,
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "snapshot_cache",
"task" => "prune"
);
});
Ok(())
}
let finalized_state = self
.get_state(&new_finalized_state_root, None)?
.ok_or_else(|| Error::MissingBeaconState(new_finalized_state_root))?;
self.op_pool
.prune_all(&finalized_state, self.head_info()?.fork);
self.store_migrator.process_finalization(
new_finalized_state_root.into(),
finalized_state,
self.head_tracker.clone(),
old_finalized_checkpoint,
new_finalized_checkpoint,
);
let _ = self.event_handler.register(EventKind::BeaconFinalization {
epoch: new_finalized_checkpoint.epoch,
root: new_finalized_checkpoint.root,
});
Ok(())
}
/// Returns `true` if the given block root has not been processed.

View File

@ -1,5 +1,6 @@
use crate::beacon_chain::ForkChoiceError;
use crate::eth1_chain::Error as Eth1ChainError;
use crate::migrate::PruningError;
use crate::naive_aggregation_pool::Error as NaiveAggregationError;
use crate::observed_attestations::Error as ObservedAttestationsError;
use crate::observed_attesters::Error as ObservedAttestersError;
@ -61,6 +62,7 @@ pub enum BeaconChainError {
requested_slot: Slot,
max_task_runtime: Duration,
},
MissingFinalizedStateRoot(Slot),
/// Returned when an internal check fails, indicating corrupt data.
InvariantViolated(String),
SszTypesError(SszTypesError),
@ -79,6 +81,7 @@ pub enum BeaconChainError {
ObservedAttestationsError(ObservedAttestationsError),
ObservedAttestersError(ObservedAttestersError),
ObservedBlockProducersError(ObservedBlockProducersError),
PruningError(PruningError),
ArithError(ArithError),
}
@ -94,6 +97,7 @@ easy_from_to!(ObservedAttestationsError, BeaconChainError);
easy_from_to!(ObservedAttestersError, BeaconChainError);
easy_from_to!(ObservedBlockProducersError, BeaconChainError);
easy_from_to!(BlockSignatureVerifierError, BeaconChainError);
easy_from_to!(PruningError, BeaconChainError);
easy_from_to!(ArithError, BeaconChainError);
#[derive(Debug)]

View File

@ -1,18 +1,34 @@
use crate::errors::BeaconChainError;
use crate::head_tracker::HeadTracker;
use parking_lot::Mutex;
use slog::{debug, warn, Logger};
use slog::{debug, error, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use store::hot_cold_store::{process_finalization, HotColdDBError};
use store::iter::{ParentRootBlockIterator, RootsIterator};
use store::iter::RootsIterator;
use store::{Error, ItemStore, StoreOp};
pub use store::{HotColdDB, MemoryStore};
use types::*;
use types::{BeaconState, EthSpec, Hash256, Slot};
use types::{
BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, EthSpec, Hash256,
SignedBeaconBlockHash, Slot,
};
/// Logic errors that can occur during pruning, none of these should ever happen.
#[derive(Debug)]
pub enum PruningError {
IncorrectFinalizedState {
state_slot: Slot,
new_finalized_slot: Slot,
},
MissingInfoForCanonicalChain {
slot: Slot,
},
UnexpectedEqualStateRoots,
UnexpectedUnequalStateRoots,
}
/// Trait for migration processes that update the database upon finalization.
pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
@ -22,17 +38,16 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
fn process_finalization(
&self,
_state_root: Hash256,
_finalized_state_root: BeaconStateHash,
_new_finalized_state: BeaconState<E>,
_max_finality_distance: u64,
_head_tracker: Arc<HeadTracker>,
_old_finalized_block_hash: SignedBeaconBlockHash,
_new_finalized_block_hash: SignedBeaconBlockHash,
_old_finalized_checkpoint: Checkpoint,
_new_finalized_checkpoint: Checkpoint,
) {
}
/// Traverses live heads and prunes blocks and states of chains that we know can't be built
/// upon because finalization would prohibit it. This is an optimisation intended to save disk
/// upon because finalization would prohibit it. This is an optimisation intended to save disk
/// space.
///
/// Assumptions:
@ -40,37 +55,63 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
fn prune_abandoned_forks(
store: Arc<HotColdDB<E, Hot, Cold>>,
head_tracker: Arc<HeadTracker>,
old_finalized_block_hash: SignedBeaconBlockHash,
new_finalized_block_hash: SignedBeaconBlockHash,
new_finalized_slot: Slot,
new_finalized_state_hash: BeaconStateHash,
new_finalized_state: &BeaconState<E>,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
log: &Logger,
) -> Result<(), BeaconChainError> {
// There will never be any blocks to prune if there is only a single head in the chain.
if head_tracker.heads().len() == 1 {
return Ok(());
}
let old_finalized_slot = store
.get_block(&old_finalized_block_hash.into())?
.ok_or_else(|| BeaconChainError::MissingBeaconBlock(old_finalized_block_hash.into()))?
.slot();
let old_finalized_slot = old_finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
let new_finalized_slot = new_finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
let new_finalized_block_hash = new_finalized_checkpoint.root.into();
// Collect hashes from new_finalized_block back to old_finalized_block (inclusive)
let mut found_block = false; // hack for `take_until`
let newly_finalized_blocks: HashMap<SignedBeaconBlockHash, Slot> =
ParentRootBlockIterator::new(&*store, new_finalized_block_hash.into())
.take_while(|result| match result {
Ok((block_hash, _)) => {
if found_block {
false
} else {
found_block |= *block_hash == old_finalized_block_hash.into();
true
}
}
Err(_) => true,
})
.map(|result| result.map(|(block_hash, block)| (block_hash.into(), block.slot())))
.collect::<Result<_, _>>()?;
// The finalized state must be for the epoch boundary slot, not the slot of the finalized
// block.
if new_finalized_state.slot != new_finalized_slot {
return Err(PruningError::IncorrectFinalizedState {
state_slot: new_finalized_state.slot,
new_finalized_slot,
}
.into());
}
debug!(
log,
"Starting database pruning";
"old_finalized_epoch" => old_finalized_checkpoint.epoch,
"old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root),
"new_finalized_epoch" => new_finalized_checkpoint.epoch,
"new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root),
);
// For each slot between the new finalized checkpoint and the old finalized checkpoint,
// collect the beacon block root and state root of the canonical chain.
let newly_finalized_chain: HashMap<Slot, (SignedBeaconBlockHash, BeaconStateHash)> =
std::iter::once(Ok((
new_finalized_slot,
(new_finalized_block_hash, new_finalized_state_hash),
)))
.chain(
RootsIterator::new(store.clone(), new_finalized_state).map(|res| {
res.map(|(block_root, state_root, slot)| {
(slot, (block_root.into(), state_root.into()))
})
}),
)
.take_while(|res| {
res.as_ref()
.map_or(true, |(slot, _)| *slot >= old_finalized_slot)
})
.collect::<Result<_, _>>()?;
// We don't know which blocks are shared among abandoned chains, so we buffer and delete
// everything in one fell swoop.
@ -79,75 +120,110 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
let mut abandoned_heads: HashSet<Hash256> = HashSet::new();
for (head_hash, head_slot) in head_tracker.heads() {
let mut potentially_abandoned_head: Option<Hash256> = Some(head_hash);
let mut potentially_abandoned_blocks: Vec<(
Slot,
Option<SignedBeaconBlockHash>,
Option<BeaconStateHash>,
)> = Vec::new();
let mut potentially_abandoned_head = Some(head_hash);
let mut potentially_abandoned_blocks = vec![];
let head_state_hash = store
.get_block(&head_hash)?
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(head_hash.into()))?
.state_root();
// Iterate backwards from this head, staging blocks and states for deletion.
let iter = std::iter::once(Ok((head_hash, head_state_hash, head_slot)))
.chain(RootsIterator::from_block(Arc::clone(&store), head_hash)?);
.chain(RootsIterator::from_block(store.clone(), head_hash)?);
for maybe_tuple in iter {
let (block_hash, state_hash, slot) = maybe_tuple?;
if slot < old_finalized_slot {
// We must assume here any candidate chains include old_finalized_block_hash,
// i.e. there aren't any forks starting at a block that is a strict ancestor of
// old_finalized_block_hash.
break;
}
match newly_finalized_blocks.get(&block_hash.into()).copied() {
// Block is not finalized, mark it and its state for deletion
let (block_root, state_root, slot) = maybe_tuple?;
let block_root = SignedBeaconBlockHash::from(block_root);
let state_root = BeaconStateHash::from(state_root);
match newly_finalized_chain.get(&slot) {
// If there's no information about a slot on the finalized chain, then
// it should be because it's ahead of the new finalized slot. Stage
// the fork's block and state for possible deletion.
None => {
potentially_abandoned_blocks.push((
slot,
Some(block_hash.into()),
Some(state_hash.into()),
));
if slot > new_finalized_slot {
potentially_abandoned_blocks.push((
slot,
Some(block_root),
Some(state_root),
));
} else if slot >= old_finalized_slot {
return Err(PruningError::MissingInfoForCanonicalChain { slot }.into());
} else {
// We must assume here any candidate chains include the old finalized
// checkpoint, i.e. there aren't any forks starting at a block that is a
// strict ancestor of old_finalized_checkpoint.
warn!(
log,
"Found a chain that should already have been pruned";
"head_block_root" => format!("{:?}", head_hash),
"head_slot" => head_slot,
);
break;
}
}
Some(finalized_slot) => {
// Block root is finalized, and we have reached the slot it was finalized
// at: we've hit a shared part of the chain.
if finalized_slot == slot {
// The first finalized block of a candidate chain lies after (in terms
// of slots order) the newly finalized block. It's not a candidate for
// prunning.
if finalized_slot == new_finalized_slot {
Some((finalized_block_root, finalized_state_root)) => {
// This fork descends from a newly finalized block, we can stop.
if block_root == *finalized_block_root {
// Sanity check: if the slot and block root match, then the
// state roots should match too.
if state_root != *finalized_state_root {
return Err(PruningError::UnexpectedUnequalStateRoots.into());
}
// If the fork descends from the whole finalized chain,
// do not prune it. Otherwise continue to delete all
// of the blocks and states that have been staged for
// deletion so far.
if slot == new_finalized_slot {
potentially_abandoned_blocks.clear();
potentially_abandoned_head.take();
}
// If there are skipped slots on the fork to be pruned, then
// we will have just staged the common block for deletion.
// Unstage it.
else {
for (_, block_root, _) in
potentially_abandoned_blocks.iter_mut().rev()
{
if block_root.as_ref() == Some(finalized_block_root) {
*block_root = None;
} else {
break;
}
}
}
break;
}
// Block root is finalized, but we're at a skip slot: delete the state only.
else {
} else {
if state_root == *finalized_state_root {
return Err(PruningError::UnexpectedEqualStateRoots.into());
}
potentially_abandoned_blocks.push((
slot,
None,
Some(state_hash.into()),
Some(block_root),
Some(state_root),
));
}
}
}
}
abandoned_heads.extend(potentially_abandoned_head.into_iter());
if !potentially_abandoned_blocks.is_empty() {
if let Some(abandoned_head) = potentially_abandoned_head {
debug!(
log,
"Pruning head";
"head_block_root" => format!("{:?}", abandoned_head),
"head_slot" => head_slot,
);
abandoned_heads.insert(abandoned_head);
abandoned_blocks.extend(
potentially_abandoned_blocks
.iter()
.filter_map(|(_, maybe_block_hash, _)| *maybe_block_hash),
);
abandoned_states.extend(potentially_abandoned_blocks.iter().filter_map(
|(slot, _, maybe_state_hash)| match maybe_state_hash {
None => None,
Some(state_hash) => Some((*slot, *state_hash)),
},
|(slot, _, maybe_state_hash)| maybe_state_hash.map(|sr| (*slot, sr)),
));
}
}
@ -166,6 +242,8 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
head_tracker.remove_head(head_hash);
}
debug!(log, "Database pruning complete");
Ok(())
}
}
@ -184,48 +262,52 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Migrate<E, Hot, Cold> fo
/// Mostly useful for tests.
pub struct BlockingMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>,
log: Logger,
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Migrate<E, Hot, Cold>
for BlockingMigrator<E, Hot, Cold>
{
fn new(db: Arc<HotColdDB<E, Hot, Cold>>, _: Logger) -> Self {
BlockingMigrator { db }
fn new(db: Arc<HotColdDB<E, Hot, Cold>>, log: Logger) -> Self {
BlockingMigrator { db, log }
}
fn process_finalization(
&self,
state_root: Hash256,
finalized_state_root: BeaconStateHash,
new_finalized_state: BeaconState<E>,
_max_finality_distance: u64,
head_tracker: Arc<HeadTracker>,
old_finalized_block_hash: SignedBeaconBlockHash,
new_finalized_block_hash: SignedBeaconBlockHash,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
) {
if let Err(e) = Self::prune_abandoned_forks(
self.db.clone(),
head_tracker,
old_finalized_block_hash,
new_finalized_block_hash,
new_finalized_state.slot,
finalized_state_root,
&new_finalized_state,
old_finalized_checkpoint,
new_finalized_checkpoint,
&self.log,
) {
eprintln!("Pruning error: {:?}", e);
error!(&self.log, "Pruning error"; "error" => format!("{:?}", e));
}
if let Err(e) = process_finalization(self.db.clone(), state_root, &new_finalized_state) {
// This migrator is only used for testing, so we just log to stderr without a logger.
eprintln!("Migration error: {:?}", e);
if let Err(e) = process_finalization(
self.db.clone(),
finalized_state_root.into(),
&new_finalized_state,
) {
error!(&self.log, "Migration error"; "error" => format!("{:?}", e));
}
}
}
type MpscSender<E> = mpsc::Sender<(
Hash256,
BeaconStateHash,
BeaconState<E>,
Arc<HeadTracker>,
SignedBeaconBlockHash,
SignedBeaconBlockHash,
Slot,
Checkpoint,
Checkpoint,
)>;
/// Migrator that runs a background thread to migrate state from the hot to the cold database.
@ -243,34 +325,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Migrate<E, Hot, Cold>
Self { db, tx_thread, log }
}
/// Perform the freezing operation on the database,
fn process_finalization(
&self,
finalized_state_root: Hash256,
finalized_state_root: BeaconStateHash,
new_finalized_state: BeaconState<E>,
max_finality_distance: u64,
head_tracker: Arc<HeadTracker>,
old_finalized_block_hash: SignedBeaconBlockHash,
new_finalized_block_hash: SignedBeaconBlockHash,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
) {
if !self.needs_migration(new_finalized_state.slot, max_finality_distance) {
return;
}
let (ref mut tx, ref mut thread) = *self.tx_thread.lock();
let new_finalized_slot = new_finalized_state.slot;
if let Err(tx_err) = tx.send((
finalized_state_root,
new_finalized_state,
head_tracker,
old_finalized_block_hash,
new_finalized_block_hash,
new_finalized_slot,
old_finalized_checkpoint,
new_finalized_checkpoint,
)) {
let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone());
drop(mem::replace(tx, new_tx));
*tx = new_tx;
let old_thread = mem::replace(thread, new_thread);
// Join the old thread, which will probably have panicked, or may have
@ -290,53 +364,37 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Migrate<E, Hot, Cold>
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
/// Return true if a migration needs to be performed, given a new `finalized_slot`.
fn needs_migration(&self, finalized_slot: Slot, max_finality_distance: u64) -> bool {
let finality_distance = finalized_slot - self.db.get_split_slot();
finality_distance > max_finality_distance
}
#[allow(clippy::type_complexity)]
/// Spawn a new child thread to run the migration process.
///
/// Return a channel handle for sending new finalized states to the thread.
fn spawn_thread(
db: Arc<HotColdDB<E, Hot, Cold>>,
log: Logger,
) -> (
mpsc::Sender<(
Hash256,
BeaconState<E>,
Arc<HeadTracker>,
SignedBeaconBlockHash,
SignedBeaconBlockHash,
Slot,
)>,
thread::JoinHandle<()>,
) {
) -> (MpscSender<E>, thread::JoinHandle<()>) {
let (tx, rx) = mpsc::channel();
let thread = thread::spawn(move || {
while let Ok((
state_root,
state,
head_tracker,
old_finalized_block_hash,
new_finalized_block_hash,
new_finalized_slot,
old_finalized_checkpoint,
new_finalized_checkpoint,
)) = rx.recv()
{
match Self::prune_abandoned_forks(
db.clone(),
head_tracker,
old_finalized_block_hash,
new_finalized_block_hash,
new_finalized_slot,
state_root,
&state,
old_finalized_checkpoint,
new_finalized_checkpoint,
&log,
) {
Ok(()) => {}
Err(e) => warn!(log, "Block pruning failed: {:?}", e),
}
match process_finalization(db.clone(), state_root, &state) {
match process_finalization(db.clone(), state_root.into(), &state) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(

View File

@ -298,7 +298,7 @@ where
let mut head_block_root = None;
loop {
let (block, new_state) = self.build_block(state.clone(), slot, block_strategy);
let (block, new_state) = self.build_block(state.clone(), slot);
if !predicate(&block, &new_state) {
break;
@ -339,7 +339,7 @@ where
let slot = self.chain.slot().unwrap();
self.build_block(state, slot, BlockStrategy::OnCanonicalHead)
self.build_block(state, slot)
}
/// A simple method to produce and process all attestation at the current slot. Always uses
@ -368,12 +368,9 @@ where
self.chain.head().unwrap().beacon_state
}
/// Adds a single block (synchronously) onto either the canonical chain (block_strategy ==
/// OnCanonicalHead) or a fork (block_strategy == ForkCanonicalChainAt).
pub fn add_block(
&self,
state: &BeaconState<E>,
block_strategy: BlockStrategy,
slot: Slot,
validators: &[usize],
) -> (SignedBeaconBlockHash, BeaconState<E>) {
@ -381,7 +378,7 @@ where
self.advance_slot();
}
let (block, new_state) = self.build_block(state.clone(), slot, block_strategy);
let (block, new_state) = self.build_block(state.clone(), slot);
let block_root = self
.chain
@ -395,15 +392,14 @@ where
(block_root.into(), new_state)
}
#[allow(clippy::type_complexity)]
/// `add_block()` repeated `num_blocks` times.
#[allow(clippy::type_complexity)]
pub fn add_blocks(
&self,
mut state: BeaconState<E>,
mut slot: Slot,
num_blocks: usize,
attesting_validators: &[usize],
block_strategy: BlockStrategy,
) -> (
HashMap<Slot, SignedBeaconBlockHash>,
HashMap<Slot, BeaconStateHash>,
@ -414,8 +410,7 @@ where
let mut blocks: HashMap<Slot, SignedBeaconBlockHash> = HashMap::with_capacity(num_blocks);
let mut states: HashMap<Slot, BeaconStateHash> = HashMap::with_capacity(num_blocks);
for _ in 0..num_blocks {
let (new_root_hash, new_state) =
self.add_block(&state, block_strategy, slot, attesting_validators);
let (new_root_hash, new_state) = self.add_block(&state, slot, attesting_validators);
blocks.insert(slot, new_root_hash);
states.insert(slot, new_state.tree_hash_root().into());
state = new_state;
@ -426,7 +421,6 @@ where
}
#[allow(clippy::type_complexity)]
/// A wrapper on `add_blocks()` to avoid passing enums explicitly.
pub fn add_canonical_chain_blocks(
&self,
state: BeaconState<E>,
@ -440,18 +434,10 @@ where
SignedBeaconBlockHash,
BeaconState<E>,
) {
let block_strategy = BlockStrategy::OnCanonicalHead;
self.add_blocks(
state,
slot,
num_blocks,
attesting_validators,
block_strategy,
)
self.add_blocks(state, slot, num_blocks, attesting_validators)
}
#[allow(clippy::type_complexity)]
/// A wrapper on `add_blocks()` to avoid passing enums explicitly.
pub fn add_stray_blocks(
&self,
state: BeaconState<E>,
@ -465,17 +451,7 @@ where
SignedBeaconBlockHash,
BeaconState<E>,
) {
let block_strategy = BlockStrategy::ForkCanonicalChainAt {
previous_slot: slot,
first_slot: slot + 2,
};
self.add_blocks(
state,
slot + 2,
num_blocks,
attesting_validators,
block_strategy,
)
self.add_blocks(state, slot + 2, num_blocks, attesting_validators)
}
/// Returns a newly created block, signed by the proposer for the given slot.
@ -483,8 +459,8 @@ where
&self,
mut state: BeaconState<E>,
slot: Slot,
block_strategy: BlockStrategy,
) -> (SignedBeaconBlock<E>, BeaconState<E>) {
assert_ne!(slot, 0);
if slot < state.slot {
panic!("produce slot cannot be prior to the state slot");
}
@ -498,15 +474,9 @@ where
.build_all_caches(&self.spec)
.expect("should build caches");
let proposer_index = match block_strategy {
BlockStrategy::OnCanonicalHead => self
.chain
.block_proposer(slot)
.expect("should get block proposer from chain"),
_ => state
.get_beacon_proposer_index(slot, &self.spec)
.expect("should get block proposer from state"),
};
let proposer_index = state
.get_beacon_proposer_index(slot, &self.spec)
.expect("should get block proposer from state");
let sk = &self.keypairs[proposer_index].sk;
let fork = &state.fork;

View File

@ -1292,6 +1292,226 @@ fn prunes_skipped_slots_states() {
}
}
fn check_all_blocks_exist<'a>(
harness: &TestHarness,
blocks: impl Iterator<Item = &'a SignedBeaconBlockHash>,
) {
for &block_hash in blocks {
let block = harness.chain.get_block(&block_hash.into()).unwrap();
assert!(
block.is_some(),
"expected block {:?} to be in DB",
block_hash
);
}
}
fn check_all_states_exist<'a>(
harness: &TestHarness,
states: impl Iterator<Item = &'a BeaconStateHash>,
) {
for &state_hash in states {
let state = harness.chain.get_state(&state_hash.into(), None).unwrap();
assert!(
state.is_some(),
"expected state {:?} to be in DB",
state_hash,
);
}
}
// Check that none of the given states exist in the database.
fn check_no_states_exist<'a>(
harness: &TestHarness,
states: impl Iterator<Item = &'a BeaconStateHash>,
) {
for &state_root in states {
assert!(
harness
.chain
.get_state(&state_root.into(), None)
.unwrap()
.is_none(),
"state {:?} should not be in the DB",
state_root
);
}
}
// Check that none of the given blocks exist in the database.
fn check_no_blocks_exist<'a>(
harness: &TestHarness,
blocks: impl Iterator<Item = &'a SignedBeaconBlockHash>,
) {
for &block_hash in blocks {
let block = harness.chain.get_block(&block_hash.into()).unwrap();
assert!(
block.is_none(),
"did not expect block {:?} to be in the DB",
block_hash
);
}
}
#[test]
fn prune_single_block_fork() {
let slots_per_epoch = E::slots_per_epoch() as usize;
pruning_test(3 * slots_per_epoch, 1, slots_per_epoch, 0, 1);
}
#[test]
fn prune_single_block_long_skip() {
let slots_per_epoch = E::slots_per_epoch() as usize;
pruning_test(
2 * slots_per_epoch,
1,
slots_per_epoch,
2 * slots_per_epoch as u64,
1,
);
}
#[test]
fn prune_shared_skip_states_mid_epoch() {
let slots_per_epoch = E::slots_per_epoch() as usize;
pruning_test(
slots_per_epoch + slots_per_epoch / 2,
1,
slots_per_epoch,
2,
slots_per_epoch - 1,
);
}
#[test]
fn prune_shared_skip_states_epoch_boundaries() {
let slots_per_epoch = E::slots_per_epoch() as usize;
pruning_test(slots_per_epoch - 1, 1, slots_per_epoch, 2, slots_per_epoch);
pruning_test(slots_per_epoch - 1, 2, slots_per_epoch, 1, slots_per_epoch);
pruning_test(
2 * slots_per_epoch + slots_per_epoch / 2,
slots_per_epoch as u64 / 2,
slots_per_epoch,
slots_per_epoch as u64 / 2 + 1,
slots_per_epoch,
);
pruning_test(
2 * slots_per_epoch + slots_per_epoch / 2,
slots_per_epoch as u64 / 2,
slots_per_epoch,
slots_per_epoch as u64 / 2 + 1,
slots_per_epoch,
);
pruning_test(
2 * slots_per_epoch - 1,
slots_per_epoch as u64,
1,
0,
2 * slots_per_epoch,
);
}
/// Generic harness for pruning tests.
fn pruning_test(
// Number of blocks to start the chain with before forking.
num_initial_blocks: usize,
// Number of skip slots on the main chain after the initial blocks.
num_canonical_skips: u64,
// Number of blocks on the main chain after the skip, but before the finalisation-triggering
// blocks.
num_canonical_middle_blocks: usize,
// Number of skip slots on the fork chain after the initial blocks.
num_fork_skips: u64,
// Number of blocks on the fork chain after the skips.
num_fork_blocks: usize,
) {
const VALIDATOR_COUNT: usize = 24;
const VALIDATOR_SUPERMAJORITY: usize = (VALIDATOR_COUNT / 3) * 2;
const HONEST_VALIDATOR_COUNT: usize = VALIDATOR_SUPERMAJORITY;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), VALIDATOR_COUNT);
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
let faulty_validators: Vec<usize> = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect();
let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize;
let (_, _, divergence_slot, _, divergence_state) = harness.add_blocks(
harness.get_head_state(),
harness.get_chain_slot(),
num_initial_blocks,
&honest_validators,
);
let (_, _, canonical_slot, _, canonical_state) = harness.add_blocks(
divergence_state.clone(),
divergence_slot + num_canonical_skips,
num_canonical_middle_blocks,
&honest_validators,
);
let (stray_blocks, stray_states, stray_slot, _, stray_head_state) = harness.add_blocks(
divergence_state.clone(),
divergence_slot + num_fork_skips,
num_fork_blocks,
&faulty_validators,
);
let stray_head_state_root = stray_states[&(stray_slot - 1)];
let stray_states = harness
.chain
.rev_iter_state_roots_from(stray_head_state_root.into(), &stray_head_state)
.map(Result::unwrap)
.map(|(state_root, _)| state_root.into())
.collect::<HashSet<_>>();
check_all_blocks_exist(&harness, stray_blocks.values());
check_all_states_exist(&harness, stray_states.iter());
let chain_dump = harness.chain.chain_dump().unwrap();
assert_eq!(
get_finalized_epoch_boundary_blocks(&chain_dump),
vec![Hash256::zero().into()].into_iter().collect(),
);
// Trigger finalization
let num_finalization_blocks = 4 * slots_per_epoch;
let (_, _, _, _, _) = harness.add_blocks(
canonical_state,
canonical_slot,
num_finalization_blocks,
&honest_validators,
);
// Check that finalization has advanced past the divergence slot.
assert!(
harness
.chain
.head_info()
.unwrap()
.finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch())
> divergence_slot
);
check_chain_dump(
&harness,
(num_initial_blocks + num_canonical_middle_blocks + num_finalization_blocks + 1) as u64,
);
let all_canonical_states = harness
.chain
.rev_iter_state_roots()
.unwrap()
.map(Result::unwrap)
.map(|(state_root, _)| state_root.into())
.collect::<HashSet<BeaconStateHash>>();
check_all_states_exist(&harness, all_canonical_states.iter());
check_no_states_exist(&harness, stray_states.difference(&all_canonical_states));
check_no_blocks_exist(&harness, stray_blocks.values());
}
/// Check that the head state's slot matches `expected_slot`.
fn check_slot(harness: &TestHarness, expected_slot: u64) {
let state = &harness.chain.head().expect("should get head").beacon_state;
@ -1396,18 +1616,31 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
}
}
/// Check that state and block root iterators can reach genesis
/// Check that every state from the canonical chain is in the database, and that the
/// reverse state and block root iterators reach genesis.
fn check_iterators(harness: &TestHarness) {
assert_eq!(
harness
.chain
.rev_iter_state_roots()
.expect("should get iter")
.last()
.map(Result::unwrap)
.map(|(_, slot)| slot),
Some(Slot::new(0))
);
let mut min_slot = None;
for (state_root, slot) in harness
.chain
.rev_iter_state_roots()
.expect("should get iter")
.map(Result::unwrap)
{
assert!(
harness
.chain
.store
.get_state(&state_root, Some(slot))
.unwrap()
.is_some(),
"state {:?} from canonical chain should be in DB",
state_root
);
min_slot = Some(slot);
}
// Assert that we reached genesis.
assert_eq!(min_slot, Some(Slot::new(0)));
// Assert that the block root iterator reaches genesis.
assert_eq!(
harness
.chain

View File

@ -14,7 +14,7 @@ use crate::{
};
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use slog::{debug, error, trace, warn, Logger};
use slog::{debug, error, info, trace, warn, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::{
@ -147,6 +147,12 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly.
if let Some(split) = db.load_split()? {
info!(
db.log,
"Hot-Cold DB initialized";
"split_slot" => split.slot,
"split_state" => format!("{:?}", split.state_root)
);
*db.split.write() = split;
}
Ok(db)