Use forwards iterator for state root lookups (#2422)

## Issue Addressed

#2377 

## Proposed Changes

Implement the same code used for block root lookups (from #2376) to state root lookups in order to improve performance and reduce associated memory spikes (e.g. from certain HTTP API requests).

## Additional Changes

- Tests using `rev_iter_state_roots` and `rev_iter_block_roots` have been refactored to use their `forwards` versions instead.
- The `rev_iter_state_roots` and `rev_iter_block_roots` functions are now unused and have been removed.
- The `state_at_slot` function has been changed to use the `forwards` iterator.

## Additional Info

- Some tests still need to be refactored to use their `forwards_iter` versions. These tests start their iteration from a specific beacon state and thus use the `rev_iter_state_roots_from` and `rev_iter_block_roots_from` functions. If they can be refactored, those functions can also be removed.
This commit is contained in:
Mac L 2021-07-06 02:38:53 +00:00
parent 73d002ef92
commit 406e3921d9
6 changed files with 283 additions and 84 deletions

View File

@ -390,29 +390,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
}
/// Iterates across all `(block_root, slot)` pairs from the head of the chain (inclusive) to
/// the earliest reachable ancestor (may or may not be genesis).
/// Iterates across all `(block_root, slot)` pairs from `start_slot`
/// to the head of the chain (inclusive).
///
/// ## Notes
///
/// `slot` always decreases by `1`.
/// - `slot` always increases by `1`.
/// - Skipped slots contain the root of the closest prior
/// non-skipped slot (identical to the way they are stored in `state.block_roots`) .
/// non-skipped slot (identical to the way they are stored in `state.block_roots`).
/// - Iterator returns `(Hash256, Slot)`.
/// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot
/// returned may be earlier than the wall-clock slot.
pub fn rev_iter_block_roots(
&self,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let head = self.head()?;
let iter = BlockRootsIterator::owned(self.store.clone(), head.beacon_state);
Ok(
std::iter::once(Ok((head.beacon_block_root, head.beacon_block.slot())))
.chain(iter)
.map(|result| result.map_err(|e| e.into())),
)
}
pub fn forwards_iter_block_roots(
&self,
start_slot: Slot,
@ -434,7 +420,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// ## Notes
///
/// `slot` always decreases by `1`.
/// - `slot` always decreases by `1`.
/// - Skipped slots contain the root of the closest prior
/// non-skipped slot (identical to the way they are stored in `state.block_roots`) .
/// - Iterator returns `(Hash256, Slot)`.
@ -526,29 +512,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}
/// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to
/// the earliest reachable ancestor (may or may not be genesis).
/// Iterates backwards across all `(state_root, slot)` pairs starting from
/// an arbitrary `BeaconState` to the earliest reachable ancestor (may or may not be genesis).
///
/// ## Notes
///
/// `slot` always decreases by `1`.
/// - `slot` always decreases by `1`.
/// - Iterator returns `(Hash256, Slot)`.
/// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot
/// returned may be earlier than the wall-clock slot.
pub fn rev_iter_state_roots(
&self,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let head = self.head()?;
let head_slot = head.beacon_state.slot;
let head_state_root = head.beacon_state_root();
let iter = StateRootsIterator::owned(self.store.clone(), head.beacon_state);
let iter = std::iter::once(Ok((head_state_root, head_slot)))
.chain(iter)
.map(|result| result.map_err(Into::into));
Ok(iter)
}
/// As for `rev_iter_state_roots` but starting from an arbitrary `BeaconState`.
pub fn rev_iter_state_roots_from<'a>(
&self,
state_root: Hash256,
@ -559,6 +531,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|result| result.map_err(Into::into))
}
/// Iterates across all `(state_root, slot)` pairs from `start_slot`
/// to the head of the chain (inclusive).
///
/// ## Notes
///
/// - `slot` always increases by `1`.
/// - Iterator returns `(Hash256, Slot)`.
pub fn forwards_iter_state_roots(
&self,
start_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let local_head = self.head()?;
let iter = HotColdDB::forwards_state_roots_iterator(
self.store.clone(),
start_slot,
local_head.beacon_state_root(),
local_head.beacon_state,
&self.spec,
)?;
Ok(iter.map(|result| result.map_err(Into::into)))
}
/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
///
/// Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
@ -580,16 +576,48 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
/// Returns the state root at the given slot, if any. Only returns state roots in the canonical chain.
///
/// ## Errors
///
/// May return a database error.
pub fn state_root_at_slot(&self, slot: Slot) -> Result<Option<Hash256>, Error> {
process_results(self.rev_iter_state_roots()?, |mut iter| {
iter.find(|(_, this_slot)| *this_slot == slot)
.map(|(root, _)| root)
})
pub fn state_root_at_slot(&self, request_slot: Slot) -> Result<Option<Hash256>, Error> {
if request_slot > self.slot()? {
return Ok(None);
} else if request_slot == self.spec.genesis_slot {
return Ok(Some(self.genesis_state_root));
}
// Try an optimized path of reading the root directly from the head state.
let fast_lookup: Option<Hash256> = self.with_head(|head| {
if head.beacon_block.slot() <= request_slot {
// Return the head state root if all slots between the request and the head are skipped.
Ok(Some(head.beacon_state_root()))
} else if let Ok(root) = head.beacon_state.get_state_root(request_slot) {
// Return the root if it's easily accessible from the head state.
Ok(Some(*root))
} else {
// Fast lookup is not possible.
Ok::<_, Error>(None)
}
})?;
if let Some(root) = fast_lookup {
return Ok(Some(root));
}
process_results(self.forwards_iter_state_roots(request_slot)?, |mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
}
} else {
Ok(None)
}
})?
}
/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
@ -896,7 +924,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(state)
}
Ordering::Less => {
let state_root = process_results(self.rev_iter_state_roots()?, |iter| {
let state_root = process_results(self.forwards_iter_state_roots(slot)?, |iter| {
iter.take_while(|(_, current_slot)| *current_slot >= slot)
.find(|(_, current_slot)| *current_slot == slot)
.map(|(root, _slot)| root)

View File

@ -309,7 +309,7 @@ fn epoch_boundary_state_attestation_processing() {
let finalized_epoch = harness
.chain
.head_info()
.expect("head ok")
.expect("should get head")
.finalized_checkpoint
.epoch;
@ -444,8 +444,8 @@ fn delete_blocks_and_states() {
let split_slot = store.get_split_slot();
let finalized_states = harness
.chain
.rev_iter_state_roots()
.expect("rev iter ok")
.forwards_iter_state_roots(Slot::new(0))
.expect("should get iter")
.map(Result::unwrap);
for (state_root, slot) in finalized_states {
@ -706,7 +706,7 @@ fn check_shuffling_compatible(
{
let (block_root, slot) = maybe_tuple.unwrap();
// Shuffling is compatible targeting the current epoch,
// iff slot is greater than or equal to the current epoch pivot block
// if slot is greater than or equal to the current epoch pivot block.
assert_eq!(
harness.chain.shuffling_is_compatible(
&block_root,
@ -1671,7 +1671,7 @@ fn pruning_test(
let all_canonical_states = harness
.chain
.rev_iter_state_roots()
.forwards_iter_state_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
.map(|(state_root, _)| state_root.into())
@ -1799,17 +1799,12 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
.map(|checkpoint| (checkpoint.beacon_block_root, checkpoint.beacon_block.slot()))
.collect::<Vec<_>>();
let head = harness.chain.head().expect("should get head");
let mut forward_block_roots = HotColdDB::forwards_block_roots_iterator(
harness.chain.store.clone(),
Slot::new(0),
head.beacon_state,
head.beacon_block_root,
&harness.spec,
)
.unwrap()
.map(Result::unwrap)
.collect::<Vec<_>>();
let mut forward_block_roots = harness
.chain
.forwards_iter_block_roots(Slot::new(0))
.expect("should get iter")
.map(Result::unwrap)
.collect::<Vec<_>>();
// Drop the block roots for skipped slots.
forward_block_roots.dedup_by_key(|(block_root, _)| *block_root);
@ -1827,10 +1822,10 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
/// Check that every state from the canonical chain is in the database, and that the
/// reverse state and block root iterators reach genesis.
fn check_iterators(harness: &TestHarness) {
let mut min_slot = None;
let mut max_slot = None;
for (state_root, slot) in harness
.chain
.rev_iter_state_roots()
.forwards_iter_state_roots(Slot::new(0))
.expect("should get iter")
.map(Result::unwrap)
{
@ -1844,20 +1839,23 @@ fn check_iterators(harness: &TestHarness) {
"state {:?} from canonical chain should be in DB",
state_root
);
min_slot = Some(slot);
max_slot = Some(slot);
}
// Assert that we reached genesis.
assert_eq!(min_slot, Some(Slot::new(0)));
// Assert that the block root iterator reaches genesis.
// Assert that we reached the head.
assert_eq!(
max_slot,
Some(harness.chain.head_info().expect("should get head").slot)
);
// Assert that the block root iterator reaches the head.
assert_eq!(
harness
.chain
.rev_iter_block_roots()
.forwards_iter_block_roots(Slot::new(0))
.expect("should get iter")
.last()
.map(Result::unwrap)
.map(|(_, slot)| slot),
Some(Slot::new(0))
Some(harness.chain.head_info().expect("should get head").slot)
);
}

View File

@ -77,13 +77,13 @@ fn iterators() {
let block_roots: Vec<(Hash256, Slot)> = harness
.chain
.rev_iter_block_roots()
.forwards_iter_block_roots(Slot::new(0))
.expect("should get iter")
.map(Result::unwrap)
.collect();
let state_roots: Vec<(Hash256, Slot)> = harness
.chain
.rev_iter_state_roots()
.forwards_iter_state_roots(Slot::new(0))
.expect("should get iter")
.map(Result::unwrap)
.collect();
@ -112,30 +112,30 @@ fn iterators() {
block_roots.windows(2).for_each(|x| {
assert_eq!(
x[1].1,
x[0].1 - 1,
"block root slots should be decreasing by one"
x[0].1 + 1,
"block root slots should be increasing by one"
)
});
state_roots.windows(2).for_each(|x| {
assert_eq!(
x[1].1,
x[0].1 - 1,
"state root slots should be decreasing by one"
x[0].1 + 1,
"state root slots should be increasing by one"
)
});
let head = &harness.chain.head().expect("should get head");
assert_eq!(
*block_roots.first().expect("should have some block roots"),
*block_roots.last().expect("should have some block roots"),
(head.beacon_block_root, head.beacon_block.slot()),
"first block root and slot should be for the head block"
"last block root and slot should be for the head block"
);
assert_eq!(
*state_roots.first().expect("should have some state roots"),
*state_roots.last().expect("should have some state roots"),
(head.beacon_state_root(), head.beacon_state.slot),
"first state root and slot should be for the head state"
"last state root and slot should be for the head state"
);
}

View File

@ -861,13 +861,10 @@ impl ApiTester {
pub async fn test_beacon_headers_all_parents(self) -> Self {
let mut roots = self
.chain
.rev_iter_block_roots()
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
.map(|(root, _slot)| root)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<Vec<_>>();
// The iterator natively returns duplicate roots for skipped slots.

View File

@ -1,7 +1,7 @@
use crate::chunked_iter::ChunkedVectorIter;
use crate::chunked_vector::BlockRoots;
use crate::chunked_vector::{BlockRoots, StateRoots};
use crate::errors::{Error, Result};
use crate::iter::BlockRootsIterator;
use crate::iter::{BlockRootsIterator, StateRootsIterator};
use crate::{HotColdDB, ItemStore};
use itertools::process_results;
use std::sync::Arc;
@ -172,3 +172,169 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
self.do_next().transpose()
}
}
/// Forwards state roots iterator that makes use of the `state_roots` table in the freezer DB.
pub struct FrozenForwardsStateRootsIterator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
inner: ChunkedVectorIter<StateRoots, E, Hot, Cold>,
}
/// Forwards state roots iterator that reverses a backwards iterator (only good for short ranges).
pub struct SimpleForwardsStateRootsIterator {
// Values from the backwards iterator (in slot descending order)
values: Vec<(Hash256, Slot)>,
}
/// Fusion of the above two approaches to forwards iteration. Fast and efficient.
pub enum HybridForwardsStateRootsIterator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
PreFinalization {
iter: Box<FrozenForwardsStateRootsIterator<E, Hot, Cold>>,
/// Data required by the `PostFinalization` iterator when we get to it.
continuation_data: Box<Option<(BeaconState<E>, Hash256)>>,
},
PostFinalization {
iter: SimpleForwardsStateRootsIterator,
},
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
FrozenForwardsStateRootsIterator<E, Hot, Cold>
{
pub fn new(
store: Arc<HotColdDB<E, Hot, Cold>>,
start_slot: Slot,
last_restore_point_slot: Slot,
spec: &ChainSpec,
) -> Self {
Self {
inner: ChunkedVectorIter::new(
store,
start_slot.as_usize(),
last_restore_point_slot,
spec,
),
}
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for FrozenForwardsStateRootsIterator<E, Hot, Cold>
{
type Item = (Hash256, Slot);
fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|(slot, state_hash)| (state_hash, Slot::from(slot)))
}
}
impl SimpleForwardsStateRootsIterator {
pub fn new<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: Arc<HotColdDB<E, Hot, Cold>>,
start_slot: Slot,
end_state: BeaconState<E>,
end_state_root: Hash256,
) -> Result<Self> {
// Iterate backwards from the end state, stopping at the start slot.
let values = process_results(
std::iter::once(Ok((end_state_root, end_state.slot)))
.chain(StateRootsIterator::owned(store, end_state)),
|iter| {
iter.take_while(|(_, slot)| *slot >= start_slot)
.collect::<Vec<_>>()
},
)?;
Ok(Self { values })
}
}
impl Iterator for SimpleForwardsStateRootsIterator {
type Item = Result<(Hash256, Slot)>;
fn next(&mut self) -> Option<Self::Item> {
// Pop from the end of the vector to get the state roots in slot-ascending order.
Ok(self.values.pop()).transpose()
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
HybridForwardsStateRootsIterator<E, Hot, Cold>
{
pub fn new(
store: Arc<HotColdDB<E, Hot, Cold>>,
start_slot: Slot,
end_state: BeaconState<E>,
end_state_root: Hash256,
spec: &ChainSpec,
) -> Result<Self> {
use HybridForwardsStateRootsIterator::*;
let latest_restore_point_slot = store.get_latest_restore_point_slot();
let result = if start_slot < latest_restore_point_slot {
PreFinalization {
iter: Box::new(FrozenForwardsStateRootsIterator::new(
store,
start_slot,
latest_restore_point_slot,
spec,
)),
continuation_data: Box::new(Some((end_state, end_state_root))),
}
} else {
PostFinalization {
iter: SimpleForwardsStateRootsIterator::new(
store,
start_slot,
end_state,
end_state_root,
)?,
}
};
Ok(result)
}
fn do_next(&mut self) -> Result<Option<(Hash256, Slot)>> {
use HybridForwardsStateRootsIterator::*;
match self {
PreFinalization {
iter,
continuation_data,
} => {
match iter.next() {
Some(x) => Ok(Some(x)),
// Once the pre-finalization iterator is consumed, transition
// to a post-finalization iterator beginning from the last slot
// of the pre iterator.
None => {
let (end_state, end_state_root) =
continuation_data.take().ok_or(Error::NoContinuationData)?;
*self = PostFinalization {
iter: SimpleForwardsStateRootsIterator::new(
iter.inner.store.clone(),
Slot::from(iter.inner.end_vindex),
end_state,
end_state_root,
)?,
};
self.do_next()
}
}
}
PostFinalization { iter } => iter.next().transpose(),
}
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for HybridForwardsStateRootsIterator<E, Hot, Cold>
{
type Item = Result<(Hash256, Slot)>;
fn next(&mut self) -> Option<Self::Item> {
self.do_next().transpose()
}
}

View File

@ -2,7 +2,7 @@ use crate::chunked_vector::{
store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots,
};
use crate::config::{OnDiskStoreConfig, StoreConfig};
use crate::forwards_iter::HybridForwardsBlockRootsIterator;
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::BytesKey;
@ -393,6 +393,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec)
}
pub fn forwards_state_roots_iterator(
store: Arc<Self>,
start_slot: Slot,
end_state_root: Hash256,
end_state: BeaconState<E>,
spec: &ChainSpec,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
HybridForwardsStateRootsIterator::new(store, start_slot, end_state, end_state_root, spec)
}
/// Load an epoch boundary state by using the hot state summary look-up.
///
/// Will fall back to the cold DB if a hot state summary is not found.