Memory usage reduction (#1522)
## Issue Addressed NA ## Proposed Changes - Adds a new function to allow getting a state with a bad state root history for attestation verification. This reduces unnecessary tree hashing during attestation processing, which accounted for 23% of memory allocations (by bytes) in a recent `heaptrack` observation. - Don't clone caches on intermediate epoch-boundary states during block processing. - Reject blocks that are known to fork choice earlier during gossip processing, instead of waiting until after state has been loaded (this only happens in edge-case). - Avoid multiple re-allocations by creating a "forced" exact size iterator. ## Additional Info NA
This commit is contained in:
parent
3c689a6837
commit
61d5b592cb
@ -775,7 +775,12 @@ where
|
|||||||
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);
|
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);
|
||||||
|
|
||||||
let mut state = chain
|
let mut state = chain
|
||||||
.get_state(&target_block.state_root, Some(target_block.slot))?
|
.store
|
||||||
|
.get_inconsistent_state_for_attestation_verification_only(
|
||||||
|
&target_block.state_root,
|
||||||
|
Some(target_block.slot),
|
||||||
|
)
|
||||||
|
.map_err(BeaconChainError::from)?
|
||||||
.ok_or_else(|| BeaconChainError::MissingBeaconState(target_block.state_root))?;
|
.ok_or_else(|| BeaconChainError::MissingBeaconState(target_block.state_root))?;
|
||||||
|
|
||||||
metrics::stop_timer(state_read_timer);
|
metrics::stop_timer(state_read_timer);
|
||||||
|
@ -390,9 +390,22 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let block_root = get_block_root(&block);
|
||||||
|
|
||||||
// Do not gossip a block from a finalized slot.
|
// Do not gossip a block from a finalized slot.
|
||||||
check_block_against_finalized_slot(&block.message, chain)?;
|
check_block_against_finalized_slot(&block.message, chain)?;
|
||||||
|
|
||||||
|
// Check if the block is already known. We know it is post-finalization, so it is
|
||||||
|
// sufficient to check the fork choice.
|
||||||
|
//
|
||||||
|
// In normal operation this isn't necessary, however it is useful immediately after a
|
||||||
|
// reboot if the `observed_block_producers` cache is empty. In that case, without this
|
||||||
|
// check, we will load the parent and state from disk only to find out later that we
|
||||||
|
// already know this block.
|
||||||
|
if chain.fork_choice.read().contains_block(&block_root) {
|
||||||
|
return Err(BlockError::BlockIsAlreadyKnown);
|
||||||
|
}
|
||||||
|
|
||||||
// Check that we have not already received a block with a valid signature for this slot.
|
// Check that we have not already received a block with a valid signature for this slot.
|
||||||
if chain
|
if chain
|
||||||
.observed_block_producers
|
.observed_block_producers
|
||||||
@ -415,7 +428,6 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
let (mut parent, block) = load_parent(block, chain)?;
|
let (mut parent, block) = load_parent(block, chain)?;
|
||||||
let block_root = get_block_root(&block);
|
|
||||||
|
|
||||||
let state = cheap_state_advance_to_obtain_committees(
|
let state = cheap_state_advance_to_obtain_committees(
|
||||||
&mut parent.beacon_state,
|
&mut parent.beacon_state,
|
||||||
@ -672,7 +684,10 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
|
|||||||
let state_root = state.update_tree_hash_cache()?;
|
let state_root = state.update_tree_hash_cache()?;
|
||||||
|
|
||||||
let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 {
|
let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 {
|
||||||
StoreOp::PutState(state_root.into(), Cow::Owned(state.clone()))
|
StoreOp::PutState(
|
||||||
|
state_root.into(),
|
||||||
|
Cow::Owned(state.clone_with(CloneConfig::committee_caches_only())),
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
StoreOp::PutStateSummary(
|
StoreOp::PutStateSummary(
|
||||||
state_root.into(),
|
state_root.into(),
|
||||||
|
@ -30,6 +30,16 @@ use types::*;
|
|||||||
/// 32-byte key for accessing the `split` of the freezer DB.
|
/// 32-byte key for accessing the `split` of the freezer DB.
|
||||||
pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE";
|
pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE";
|
||||||
|
|
||||||
|
/// Defines how blocks should be replayed on states.
|
||||||
|
#[derive(PartialEq)]
|
||||||
|
pub enum BlockReplay {
|
||||||
|
/// Perform all transitions faithfully to the specification.
|
||||||
|
Accurate,
|
||||||
|
/// Don't compute state roots, eventually computing an invalid beacon state that can only be
|
||||||
|
/// used for obtaining shuffling.
|
||||||
|
InconsistentStateRoots,
|
||||||
|
}
|
||||||
|
|
||||||
/// On-disk database that stores finalized states efficiently.
|
/// On-disk database that stores finalized states efficiently.
|
||||||
///
|
///
|
||||||
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
|
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
|
||||||
@ -230,16 +240,40 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
// chain. This way we avoid returning a state that doesn't match `state_root`.
|
// chain. This way we avoid returning a state that doesn't match `state_root`.
|
||||||
self.load_cold_state(state_root)
|
self.load_cold_state(state_root)
|
||||||
} else {
|
} else {
|
||||||
self.load_hot_state(state_root)
|
self.load_hot_state(state_root, BlockReplay::Accurate)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
match self.load_hot_state(state_root)? {
|
match self.load_hot_state(state_root, BlockReplay::Accurate)? {
|
||||||
Some(state) => Ok(Some(state)),
|
Some(state) => Ok(Some(state)),
|
||||||
None => self.load_cold_state(state_root),
|
None => self.load_cold_state(state_root),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetch a state from the store, but don't compute all of the values when replaying blocks
|
||||||
|
/// upon that state (e.g., state roots). Additionally, only states from the hot store are
|
||||||
|
/// returned.
|
||||||
|
///
|
||||||
|
/// See `Self::get_state` for information about `slot`.
|
||||||
|
///
|
||||||
|
/// ## Warning
|
||||||
|
///
|
||||||
|
/// The returned state **is not a valid beacon state**, it can only be used for obtaining
|
||||||
|
/// shuffling to process attestations.
|
||||||
|
pub fn get_inconsistent_state_for_attestation_verification_only(
|
||||||
|
&self,
|
||||||
|
state_root: &Hash256,
|
||||||
|
slot: Option<Slot>,
|
||||||
|
) -> Result<Option<BeaconState<E>>, Error> {
|
||||||
|
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
|
||||||
|
|
||||||
|
if slot.map_or(false, |slot| slot < self.get_split_slot()) {
|
||||||
|
Ok(None)
|
||||||
|
} else {
|
||||||
|
self.load_hot_state(state_root, BlockReplay::InconsistentStateRoots)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
|
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
|
||||||
///
|
///
|
||||||
/// It is assumed that all states being deleted reside in the hot DB, even if their slot is less
|
/// It is assumed that all states being deleted reside in the hot DB, even if their slot is less
|
||||||
@ -283,8 +317,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}) = self.load_hot_state_summary(state_root)?
|
}) = self.load_hot_state_summary(state_root)?
|
||||||
{
|
{
|
||||||
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
|
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
|
||||||
|
//
|
||||||
|
// `BlockReplay` should be irrelevant here since we never replay blocks for an epoch
|
||||||
|
// boundary state in the hot DB.
|
||||||
let state = self
|
let state = self
|
||||||
.load_hot_state(&epoch_boundary_state_root)?
|
.load_hot_state(&epoch_boundary_state_root, BlockReplay::Accurate)?
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root)
|
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root)
|
||||||
})?;
|
})?;
|
||||||
@ -415,7 +452,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
/// Load a post-finalization state from the hot database.
|
/// Load a post-finalization state from the hot database.
|
||||||
///
|
///
|
||||||
/// Will replay blocks from the nearest epoch boundary.
|
/// Will replay blocks from the nearest epoch boundary.
|
||||||
pub fn load_hot_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
|
pub fn load_hot_state(
|
||||||
|
&self,
|
||||||
|
state_root: &Hash256,
|
||||||
|
block_replay: BlockReplay,
|
||||||
|
) -> Result<Option<BeaconState<E>>, Error> {
|
||||||
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
|
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
|
||||||
|
|
||||||
if let Some(HotStateSummary {
|
if let Some(HotStateSummary {
|
||||||
@ -436,7 +477,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
} else {
|
} else {
|
||||||
let blocks =
|
let blocks =
|
||||||
self.load_blocks_to_replay(boundary_state.slot, slot, latest_block_root)?;
|
self.load_blocks_to_replay(boundary_state.slot, slot, latest_block_root)?;
|
||||||
self.replay_blocks(boundary_state, blocks, slot)?
|
self.replay_blocks(boundary_state, blocks, slot, block_replay)?
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Some(state))
|
Ok(Some(state))
|
||||||
@ -567,7 +608,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
// 3. Replay the blocks on top of the low restore point.
|
// 3. Replay the blocks on top of the low restore point.
|
||||||
self.replay_blocks(low_restore_point, blocks, slot)
|
self.replay_blocks(low_restore_point, blocks, slot, BlockReplay::Accurate)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`.
|
/// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`.
|
||||||
@ -624,9 +665,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
fn replay_blocks(
|
fn replay_blocks(
|
||||||
&self,
|
&self,
|
||||||
mut state: BeaconState<E>,
|
mut state: BeaconState<E>,
|
||||||
blocks: Vec<SignedBeaconBlock<E>>,
|
mut blocks: Vec<SignedBeaconBlock<E>>,
|
||||||
target_slot: Slot,
|
target_slot: Slot,
|
||||||
|
block_replay: BlockReplay,
|
||||||
) -> Result<BeaconState<E>, Error> {
|
) -> Result<BeaconState<E>, Error> {
|
||||||
|
if block_replay == BlockReplay::InconsistentStateRoots {
|
||||||
|
for i in 0..blocks.len() {
|
||||||
|
blocks[i].message.state_root = Hash256::zero();
|
||||||
|
if i > 0 {
|
||||||
|
blocks[i].message.parent_root = blocks[i - 1].canonical_root()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let state_root_from_prev_block = |i: usize, state: &BeaconState<E>| {
|
let state_root_from_prev_block = |i: usize, state: &BeaconState<E>| {
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
let prev_block = &blocks[i - 1].message;
|
let prev_block = &blocks[i - 1].message;
|
||||||
@ -646,10 +697,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
|
|
||||||
while state.slot < block.message.slot {
|
while state.slot < block.message.slot {
|
||||||
let state_root = state_root_from_prev_block(i, &state);
|
let state_root = match block_replay {
|
||||||
|
BlockReplay::Accurate => state_root_from_prev_block(i, &state),
|
||||||
|
BlockReplay::InconsistentStateRoots => Some(Hash256::zero()),
|
||||||
|
};
|
||||||
per_slot_processing(&mut state, state_root, &self.spec)
|
per_slot_processing(&mut state, state_root, &self.spec)
|
||||||
.map_err(HotColdDBError::BlockReplaySlotError)?;
|
.map_err(HotColdDBError::BlockReplaySlotError)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
per_block_processing(
|
per_block_processing(
|
||||||
&mut state,
|
&mut state,
|
||||||
&block,
|
&block,
|
||||||
@ -661,7 +716,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
|
|
||||||
while state.slot < target_slot {
|
while state.slot < target_slot {
|
||||||
let state_root = state_root_from_prev_block(blocks.len(), &state);
|
let state_root = match block_replay {
|
||||||
|
BlockReplay::Accurate => state_root_from_prev_block(blocks.len(), &state),
|
||||||
|
BlockReplay::InconsistentStateRoots => Some(Hash256::zero()),
|
||||||
|
};
|
||||||
per_slot_processing(&mut state, state_root, &self.spec)
|
per_slot_processing(&mut state, state_root, &self.spec)
|
||||||
.map_err(HotColdDBError::BlockReplaySlotError)?;
|
.map_err(HotColdDBError::BlockReplaySlotError)?;
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ use rayon::prelude::*;
|
|||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use ssz_types::VariableList;
|
use ssz_types::VariableList;
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
|
use std::iter::ExactSizeIterator;
|
||||||
use tree_hash::{mix_in_length, MerkleHasher, TreeHash};
|
use tree_hash::{mix_in_length, MerkleHasher, TreeHash};
|
||||||
|
|
||||||
/// The number of fields on a beacon state.
|
/// The number of fields on a beacon state.
|
||||||
@ -288,17 +289,17 @@ impl ValidatorsListTreeHashCache {
|
|||||||
fn recalculate_tree_hash_root(&mut self, validators: &[Validator]) -> Result<Hash256, Error> {
|
fn recalculate_tree_hash_root(&mut self, validators: &[Validator]) -> Result<Hash256, Error> {
|
||||||
let mut list_arena = std::mem::take(&mut self.list_arena);
|
let mut list_arena = std::mem::take(&mut self.list_arena);
|
||||||
|
|
||||||
let leaves = self
|
let leaves = self.values.leaves(validators)?;
|
||||||
.values
|
let num_leaves = leaves.iter().map(|arena| arena.len()).sum();
|
||||||
.leaves(validators)?
|
|
||||||
.into_iter()
|
let leaves_iter = ForcedExactSizeIterator {
|
||||||
.flatten()
|
iter: leaves.into_iter().flatten().map(|h| h.to_fixed_bytes()),
|
||||||
.map(|h| h.to_fixed_bytes())
|
len: num_leaves,
|
||||||
.collect::<Vec<_>>();
|
};
|
||||||
|
|
||||||
let list_root = self
|
let list_root = self
|
||||||
.list_cache
|
.list_cache
|
||||||
.recalculate_merkle_root(&mut list_arena, leaves.into_iter())?;
|
.recalculate_merkle_root(&mut list_arena, leaves_iter)?;
|
||||||
|
|
||||||
self.list_arena = list_arena;
|
self.list_arena = list_arena;
|
||||||
|
|
||||||
@ -306,6 +307,29 @@ impl ValidatorsListTreeHashCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Provides a wrapper around some `iter` if the number of items in the iterator is known to the
|
||||||
|
/// programmer but not the compiler. This allows use of `ExactSizeIterator` in some occasions.
|
||||||
|
///
|
||||||
|
/// Care should be taken to ensure `len` is accurate.
|
||||||
|
struct ForcedExactSizeIterator<I> {
|
||||||
|
iter: I,
|
||||||
|
len: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<V, I: Iterator<Item = V>> Iterator for ForcedExactSizeIterator<I> {
|
||||||
|
type Item = V;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
self.iter.next()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<V, I: Iterator<Item = V>> ExactSizeIterator for ForcedExactSizeIterator<I> {
|
||||||
|
fn len(&self) -> usize {
|
||||||
|
self.len
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Provides a cache for each of the `Validator` objects in `state.validators` and computes the
|
/// Provides a cache for each of the `Validator` objects in `state.validators` and computes the
|
||||||
/// roots of these using Rayon parallelization.
|
/// roots of these using Rayon parallelization.
|
||||||
#[derive(Debug, PartialEq, Clone, Default, Encode, Decode)]
|
#[derive(Debug, PartialEq, Clone, Default, Encode, Decode)]
|
||||||
|
Loading…
Reference in New Issue
Block a user