Tidy ancestor iterators

This commit is contained in:
Paul Hauner 2019-08-05 16:25:21 +10:00
parent 01054ecf2f
commit b096e3a643
No known key found for this signature in database
GPG Key ID: 5E2CFF9B75FA63DF
11 changed files with 125 additions and 199 deletions

View File

@ -1,6 +1,7 @@
use crate::checkpoint::CheckPoint; use crate::checkpoint::CheckPoint;
use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice};
use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator};
use crate::metrics::Metrics; use crate::metrics::Metrics;
use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY};
use lmd_ghost::LmdGhost; use lmd_ghost::LmdGhost;
@ -19,7 +20,7 @@ use state_processing::{
per_slot_processing, BlockProcessingError, per_slot_processing, BlockProcessingError,
}; };
use std::sync::Arc; use std::sync::Arc;
use store::iter::{BestBlockRootsIterator, BlockIterator, BlockRootsIterator, StateRootsIterator}; use store::iter::{BlockRootsIterator, StateRootsIterator};
use store::{Error as DBError, Store}; use store::{Error as DBError, Store};
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::*; use types::*;
@ -224,45 +225,53 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(headers?) Ok(headers?)
} }
/// Iterate in reverse (highest to lowest slot) through all blocks from the block at `slot`
/// through to the genesis block.
///
/// Returns `None` for headers prior to genesis or when there is an error reading from `Store`.
///
/// Contains duplicate headers when skip slots are encountered.
pub fn rev_iter_blocks(&self, slot: Slot) -> BlockIterator<T::EthSpec, T::Store> {
BlockIterator::owned(self.store.clone(), self.state.read().clone(), slot)
}
/// Iterates in reverse (highest to lowest slot) through all block roots from `slot` through to /// Iterates through all the `BeaconBlock` roots and slots, first returning
/// genesis. /// `self.head().beacon_block` then all prior blocks until either genesis or if the database
/// fails to return a prior block.
/// ///
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. /// Returns duplicate roots for skip-slots.
/// ///
/// Contains duplicate roots when skip slots are encountered. /// Iterator returns `(Hash256, Slot)`.
pub fn rev_iter_block_roots(&self, slot: Slot) -> BlockRootsIterator<T::EthSpec, T::Store> {
BlockRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot)
}
/// Iterates in reverse (highest to lowest slot) through all block roots from largest
/// `slot <= beacon_state.slot` through to genesis.
/// ///
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. /// ## Note
/// ///
/// Contains duplicate roots when skip slots are encountered. /// Because this iterator starts at the `head` of the chain (viz., the best block), the first slot
pub fn rev_iter_best_block_roots( /// returned may be earlier than the wall-clock slot.
pub fn rev_iter_block_roots(
&self, &self,
slot: Slot, slot: Slot,
) -> BestBlockRootsIterator<T::EthSpec, T::Store> { ) -> ReverseBlockRootIterator<T::EthSpec, T::Store> {
BestBlockRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot) let state = &self.head().beacon_state;
let block_root = self.head().beacon_block_root;
let block_slot = state.slot;
let iter = BlockRootsIterator::owned(self.store.clone(), state.clone(), slot);
ReverseBlockRootIterator::new((block_root, block_slot), iter)
} }
/// Iterates in reverse (highest to lowest slot) through all state roots from `slot` through to /// Iterates through all the `BeaconState` roots and slots, first returning
/// genesis. /// `self.head().beacon_state` then all prior states until either genesis or if the database
/// fails to return a prior state.
/// ///
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. /// Iterator returns `(Hash256, Slot)`.
pub fn rev_iter_state_roots(&self, slot: Slot) -> StateRootsIterator<T::EthSpec, T::Store> { ///
StateRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot) /// ## Note
///
/// Because this iterator starts at the `head` of the chain (viz., the best block), the first slot
/// returned may be earlier than the wall-clock slot.
pub fn rev_iter_state_roots(
&self,
slot: Slot,
) -> ReverseStateRootIterator<T::EthSpec, T::Store> {
let state = &self.head().beacon_state;
let state_root = self.head().beacon_state_root;
let state_slot = state.slot;
let iter = StateRootsIterator::owned(self.store.clone(), state.clone(), slot);
ReverseStateRootIterator::new((state_root, state_slot), iter)
} }
/// Returns the block at the given root, if any. /// Returns the block at the given root, if any.
@ -279,8 +288,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns a read-lock guarded `BeaconState` which is the `canonical_head` that has been /// Returns a read-lock guarded `BeaconState` which is the `canonical_head` that has been
/// updated to match the current slot clock. /// updated to match the current slot clock.
pub fn current_state(&self) -> RwLockReadGuard<BeaconState<T::EthSpec>> { pub fn speculative_state(&self) -> Result<RwLockReadGuard<BeaconState<T::EthSpec>>, Error> {
self.state.read() // TODO: ensure the state has done a catch-up.
Ok(self.state.read())
} }
/// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the /// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the

View File

@ -52,7 +52,7 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
// been justified for at least 1 epoch ... If no such descendant exists, // been justified for at least 1 epoch ... If no such descendant exists,
// set justified_head to finalized_head. // set justified_head to finalized_head.
let (start_state, start_block_root, start_block_slot) = { let (start_state, start_block_root, start_block_slot) = {
let state = chain.current_state(); let state = &chain.head().beacon_state;
let (block_root, block_slot) = let (block_root, block_slot) =
if state.current_epoch() + 1 > state.current_justified_checkpoint.epoch { if state.current_epoch() + 1 > state.current_justified_checkpoint.epoch {

View File

@ -0,0 +1,48 @@
use store::iter::{BlockRootsIterator, StateRootsIterator};
use types::{Hash256, Slot};
pub type ReverseBlockRootIterator<'a, E, S> =
ReverseHashAndSlotIterator<BlockRootsIterator<'a, E, S>>;
pub type ReverseStateRootIterator<'a, E, S> =
ReverseHashAndSlotIterator<StateRootsIterator<'a, E, S>>;
pub type ReverseHashAndSlotIterator<I> = ReverseChainIterator<(Hash256, Slot), I>;
/// Provides a wrapper for an iterator that returns a given `T` before it starts returning results of
/// the `Iterator`.
pub struct ReverseChainIterator<T, I> {
first_value_used: bool,
first_value: T,
iter: I,
}
impl<T, I> ReverseChainIterator<T, I>
where
T: Sized,
I: Iterator<Item = T> + Sized,
{
pub fn new(first_value: T, iter: I) -> Self {
Self {
first_value_used: false,
first_value,
iter,
}
}
}
impl<T, I> Iterator for ReverseChainIterator<T, I>
where
T: Clone,
I: Iterator<Item = T>,
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if self.first_value_used {
self.iter.next()
} else {
self.first_value_used = true;
Some(self.first_value.clone())
}
}
}

View File

@ -2,6 +2,7 @@ mod beacon_chain;
mod checkpoint; mod checkpoint;
mod errors; mod errors;
mod fork_choice; mod fork_choice;
mod iter;
mod metrics; mod metrics;
mod persisted_beacon_chain; mod persisted_beacon_chain;
pub mod test_utils; pub mod test_utils;

View File

@ -198,7 +198,7 @@ where
fn get_state_at_slot(&self, state_slot: Slot) -> BeaconState<E> { fn get_state_at_slot(&self, state_slot: Slot) -> BeaconState<E> {
let state_root = self let state_root = self
.chain .chain
.rev_iter_state_roots(self.chain.current_state().slot - 1) .rev_iter_state_roots(self.chain.head().beacon_state.slot - 1)
.find(|(_hash, slot)| *slot == state_slot) .find(|(_hash, slot)| *slot == state_slot)
.map(|(hash, _slot)| hash) .map(|(hash, _slot)| hash)
.expect("could not find state root"); .expect("could not find state root");

View File

@ -266,7 +266,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
fn root_at_slot(&self, target_slot: Slot) -> Option<Hash256> { fn root_at_slot(&self, target_slot: Slot) -> Option<Hash256> {
self.chain self.chain
.rev_iter_best_block_roots(target_slot) .rev_iter_block_roots(target_slot)
.take(1) .take(1)
.find(|(_root, slot)| *slot == target_slot) .find(|(_root, slot)| *slot == target_slot)
.map(|(root, _slot)| root) .map(|(root, _slot)| root)
@ -280,6 +280,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
req: BeaconBlockRootsRequest, req: BeaconBlockRootsRequest,
network: &mut NetworkContext<T::EthSpec>, network: &mut NetworkContext<T::EthSpec>,
) { ) {
let state = &self.chain.head().beacon_state;
debug!( debug!(
self.log, self.log,
"BlockRootsRequest"; "BlockRootsRequest";
@ -290,8 +292,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
let mut roots: Vec<BlockRootSlot> = self let mut roots: Vec<BlockRootSlot> = self
.chain .chain
.rev_iter_best_block_roots(req.start_slot + req.count) .rev_iter_block_roots(std::cmp::min(req.start_slot + req.count, state.slot))
.take(req.count as usize) .take_while(|(_root, slot)| req.start_slot <= *slot)
.map(|(block_root, slot)| BlockRootSlot { slot, block_root }) .map(|(block_root, slot)| BlockRootSlot { slot, block_root })
.collect(); .collect();
@ -302,7 +304,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"msg" => "Failed to return all requested hashes", "msg" => "Failed to return all requested hashes",
"start_slot" => req.start_slot, "start_slot" => req.start_slot,
"current_slot" => self.chain.current_state().slot, "current_slot" => self.chain.present_slot(),
"requested" => req.count, "requested" => req.count,
"returned" => roots.len(), "returned" => roots.len(),
); );
@ -389,6 +391,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
req: BeaconBlockHeadersRequest, req: BeaconBlockHeadersRequest,
network: &mut NetworkContext<T::EthSpec>, network: &mut NetworkContext<T::EthSpec>,
) { ) {
let state = &self.chain.head().beacon_state;
debug!( debug!(
self.log, self.log,
"BlockHeadersRequest"; "BlockHeadersRequest";
@ -399,13 +403,10 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
let count = req.max_headers; let count = req.max_headers;
// Collect the block roots. // Collect the block roots.
//
// Instead of using `chain.rev_iter_blocks` we collect the roots first. This avoids
// unnecessary block deserialization when `req.skip_slots > 0`.
let mut roots: Vec<Hash256> = self let mut roots: Vec<Hash256> = self
.chain .chain
.rev_iter_best_block_roots(req.start_slot + count) .rev_iter_block_roots(std::cmp::min(req.start_slot + count, state.slot))
.take(count as usize) .take_while(|(_root, slot)| req.start_slot <= *slot)
.map(|(root, _slot)| root) .map(|(root, _slot)| root)
.collect(); .collect();

View File

@ -54,7 +54,7 @@ fn get_version(_req: Request<Body>) -> APIResult {
fn get_genesis_time<T: BeaconChainTypes + 'static>(req: Request<Body>) -> APIResult { fn get_genesis_time<T: BeaconChainTypes + 'static>(req: Request<Body>) -> APIResult {
let beacon_chain = req.extensions().get::<Arc<BeaconChain<T>>>().unwrap(); let beacon_chain = req.extensions().get::<Arc<BeaconChain<T>>>().unwrap();
let gen_time = { let gen_time = {
let state = beacon_chain.current_state(); let state = &beacon_chain.head().beacon_state;
state.genesis_time state.genesis_time
}; };
let body = Body::from( let body = Body::from(

View File

@ -40,7 +40,11 @@ impl<T: BeaconChainTypes> AttestationService for AttestationServiceInstance<T> {
// verify the slot, drop lock on state afterwards // verify the slot, drop lock on state afterwards
{ {
let slot_requested = req.get_slot(); let slot_requested = req.get_slot();
let state = &self.chain.current_state(); // TODO: this whole module is legacy and not maintained well.
let state = &self
.chain
.speculative_state()
.expect("This is legacy code and should be removed");
// Start by performing some checks // Start by performing some checks
// Check that the AttestationData is for the current slot (otherwise it will not be valid) // Check that the AttestationData is for the current slot (otherwise it will not be valid)

View File

@ -29,7 +29,11 @@ impl<T: BeaconChainTypes> ValidatorService for ValidatorServiceInstance<T> {
trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch());
let spec = &self.chain.spec; let spec = &self.chain.spec;
let state = &self.chain.current_state(); // TODO: this whole module is legacy and not maintained well.
let state = &self
.chain
.speculative_state()
.expect("This is legacy code and should be removed");
let epoch = Epoch::from(req.get_epoch()); let epoch = Epoch::from(req.get_epoch());
let mut resp = GetDutiesResponse::new(); let mut resp = GetDutiesResponse::new();
let resp_validators = resp.mut_active_validators(); let resp_validators = resp.mut_active_validators();

View File

@ -4,20 +4,23 @@ use std::sync::Arc;
use types::{BeaconBlock, BeaconState, BeaconStateError, EthSpec, Hash256, Slot}; use types::{BeaconBlock, BeaconState, BeaconStateError, EthSpec, Hash256, Slot};
/// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over. /// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over.
///
/// ## Note
///
/// It is assumed that all ancestors for this object are stored in the database. If this is not the
/// case, the iterator will start returning `None` prior to genesis.
pub trait AncestorIter<U: Store, I: Iterator> { pub trait AncestorIter<U: Store, I: Iterator> {
/// Returns an iterator over the roots of the ancestors of `self`. /// Returns an iterator over the roots of the ancestors of `self`.
fn try_iter_ancestor_roots(&self, store: Arc<U>) -> Option<I>; fn try_iter_ancestor_roots(&self, store: Arc<U>) -> Option<I>;
} }
impl<'a, U: Store, E: EthSpec> AncestorIter<U, BestBlockRootsIterator<'a, E, U>> impl<'a, U: Store, E: EthSpec> AncestorIter<U, BlockRootsIterator<'a, E, U>> for BeaconBlock<E> {
for BeaconBlock<E>
{
/// Iterates across all the prior block roots of `self`, starting at the most recent and ending /// Iterates across all the prior block roots of `self`, starting at the most recent and ending
/// at genesis. /// at genesis.
fn try_iter_ancestor_roots(&self, store: Arc<U>) -> Option<BestBlockRootsIterator<'a, E, U>> { fn try_iter_ancestor_roots(&self, store: Arc<U>) -> Option<BlockRootsIterator<'a, E, U>> {
let state = store.get::<BeaconState<E>>(&self.state_root).ok()??; let state = store.get::<BeaconState<E>>(&self.state_root).ok()??;
Some(BestBlockRootsIterator::owned(store, state, self.slot)) Some(BlockRootsIterator::owned(store, state, self.slot))
} }
} }
@ -116,11 +119,6 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> {
/// exhausted. /// exhausted.
/// ///
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`.
///
/// ## Notes
///
/// See [`BestBlockRootsIterator`](struct.BestBlockRootsIterator.html), which has different
/// `start_slot` logic.
#[derive(Clone)] #[derive(Clone)]
pub struct BlockRootsIterator<'a, T: EthSpec, U> { pub struct BlockRootsIterator<'a, T: EthSpec, U> {
store: Arc<U>, store: Arc<U>,
@ -180,104 +178,6 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> {
} }
} }
/// Iterates backwards through block roots with `start_slot` highest possible value
/// `<= beacon_state.slot`.
///
/// The distinction between `BestBlockRootsIterator` and `BlockRootsIterator` is:
///
/// - `BestBlockRootsIterator` uses best-effort slot. When `start_slot` is greater than the latest available block root
/// on `beacon_state`, returns `Some(root, slot)` where `slot` is the latest available block
/// root.
/// - `BlockRootsIterator` is strict about `start_slot`. When `start_slot` is greater than the latest available block root
/// on `beacon_state`, returns `None`.
///
/// This is distinct from `BestBlockRootsIterator`.
///
/// Uses the `block_roots` field of `BeaconState` to as the source of block roots and will
/// perform a lookup on the `Store` for a prior `BeaconState` if `block_roots` has been
/// exhausted.
///
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`.
#[derive(Clone)]
pub struct BestBlockRootsIterator<'a, T: EthSpec, U> {
store: Arc<U>,
beacon_state: Cow<'a, BeaconState<T>>,
slot: Slot,
}
impl<'a, T: EthSpec, U: Store> BestBlockRootsIterator<'a, T, U> {
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>, start_slot: Slot) -> Self {
let mut slot = start_slot;
if slot >= beacon_state.slot {
// Slot may be too high.
slot = beacon_state.slot;
if beacon_state.get_block_root(slot).is_err() {
slot -= 1;
}
}
Self {
store,
beacon_state: Cow::Borrowed(beacon_state),
slot: slot + 1,
}
}
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>, start_slot: Slot) -> Self {
let mut slot = start_slot;
if slot >= beacon_state.slot {
// Slot may be too high.
slot = beacon_state.slot;
// TODO: Use a function other than `get_block_root` as this will always return `Err()`
// for slot = state.slot.
if beacon_state.get_block_root(slot).is_err() {
slot -= 1;
}
}
Self {
store,
beacon_state: Cow::Owned(beacon_state),
slot: slot + 1,
}
}
}
impl<'a, T: EthSpec, U: Store> Iterator for BestBlockRootsIterator<'a, T, U> {
type Item = (Hash256, Slot);
fn next(&mut self) -> Option<Self::Item> {
if self.slot == 0 {
// End of Iterator
return None;
}
self.slot -= 1;
match self.beacon_state.get_block_root(self.slot) {
Ok(root) => Some((*root, self.slot)),
Err(BeaconStateError::SlotOutOfBounds) => {
// Read a `BeaconState` from the store that has access to prior historical root.
let beacon_state: BeaconState<T> = {
// Load the earliest state from disk.
let new_state_root = self.beacon_state.get_oldest_state_root().ok()?;
self.store.get(&new_state_root).ok()?
}?;
self.beacon_state = Cow::Owned(beacon_state);
let root = self.beacon_state.get_block_root(self.slot).ok()?;
Some((*root, self.slot))
}
_ => None,
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@ -337,49 +237,6 @@ mod test {
} }
} }
#[test]
fn best_block_root_iter() {
let store = Arc::new(MemoryStore::open());
let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root();
let mut state_a: BeaconState<MainnetEthSpec> = get_state();
let mut state_b: BeaconState<MainnetEthSpec> = get_state();
state_a.slot = Slot::from(slots_per_historical_root);
state_b.slot = Slot::from(slots_per_historical_root * 2);
let mut hashes = (0..).into_iter().map(|i| Hash256::from(i));
for root in &mut state_a.block_roots[..] {
*root = hashes.next().unwrap()
}
for root in &mut state_b.block_roots[..] {
*root = hashes.next().unwrap()
}
let state_a_root = hashes.next().unwrap();
state_b.state_roots[0] = state_a_root;
store.put(&state_a_root, &state_a).unwrap();
let iter = BestBlockRootsIterator::new(store.clone(), &state_b, state_b.slot);
assert!(
iter.clone().find(|(_root, slot)| *slot == 0).is_some(),
"iter should contain zero slot"
);
let mut collected: Vec<(Hash256, Slot)> = iter.collect();
collected.reverse();
let expected_len = 2 * MainnetEthSpec::slots_per_historical_root();
assert_eq!(collected.len(), expected_len);
for i in 0..expected_len {
assert_eq!(collected[i].0, Hash256::from(i as u64));
}
}
#[test] #[test]
fn state_root_iter() { fn state_root_iter() {
let store = Arc::new(MemoryStore::open()); let store = Arc::new(MemoryStore::open());

View File

@ -10,7 +10,7 @@ use lmd_ghost::{LmdGhost, ThreadSafeReducedTree as BaseThreadSafeReducedTree};
use rand::{prelude::*, rngs::StdRng}; use rand::{prelude::*, rngs::StdRng};
use std::sync::Arc; use std::sync::Arc;
use store::{ use store::{
iter::{AncestorIter, BestBlockRootsIterator}, iter::{AncestorIter, BlockRootsIterator},
MemoryStore, Store, MemoryStore, Store,
}; };
use types::{BeaconBlock, EthSpec, Hash256, MinimalEthSpec, Slot}; use types::{BeaconBlock, EthSpec, Hash256, MinimalEthSpec, Slot};
@ -159,7 +159,7 @@ fn get_ancestor_roots<E: EthSpec, U: Store>(
.expect("block should exist") .expect("block should exist")
.expect("store should not error"); .expect("store should not error");
<BeaconBlock<TestEthSpec> as AncestorIter<_, BestBlockRootsIterator<TestEthSpec, _>>>::try_iter_ancestor_roots( <BeaconBlock<TestEthSpec> as AncestorIter<_, BlockRootsIterator<TestEthSpec, _>>>::try_iter_ancestor_roots(
&block, store, &block, store,
) )
.expect("should be able to create ancestor iter") .expect("should be able to create ancestor iter")