From 5079c25bb250117e2f7fdd3cb383b1f471ca3f52 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 5 Aug 2019 16:25:21 +1000 Subject: [PATCH] Tidy ancestor iterators --- beacon_node/beacon_chain/src/beacon_chain.rs | 75 +++++---- beacon_node/beacon_chain/src/fork_choice.rs | 2 +- beacon_node/beacon_chain/src/iter.rs | 48 ++++++ beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_chain/src/test_utils.rs | 2 +- beacon_node/network/src/sync/simple_sync.rs | 19 +-- beacon_node/rest_api/src/beacon_node.rs | 2 +- beacon_node/rpc/src/attestation.rs | 6 +- beacon_node/rpc/src/validator.rs | 6 +- beacon_node/store/src/iter.rs | 159 +------------------ eth2/lmd_ghost/tests/test.rs | 4 +- 11 files changed, 125 insertions(+), 199 deletions(-) create mode 100644 beacon_node/beacon_chain/src/iter.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d0c50af70..58f64bc29 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1,6 +1,7 @@ use crate::checkpoint::CheckPoint; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; +use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator}; use crate::metrics::Metrics; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use lmd_ghost::LmdGhost; @@ -19,7 +20,7 @@ use state_processing::{ per_slot_processing, BlockProcessingError, }; use std::sync::Arc; -use store::iter::{BestBlockRootsIterator, BlockIterator, BlockRootsIterator, StateRootsIterator}; +use store::iter::{BlockRootsIterator, StateRootsIterator}; use store::{Error as DBError, Store}; use tree_hash::TreeHash; use types::*; @@ -216,45 +217,53 @@ impl BeaconChain { Ok(headers?) } - /// Iterate in reverse (highest to lowest slot) through all blocks from the block at `slot` - /// through to the genesis block. - /// - /// Returns `None` for headers prior to genesis or when there is an error reading from `Store`. - /// - /// Contains duplicate headers when skip slots are encountered. - pub fn rev_iter_blocks(&self, slot: Slot) -> BlockIterator { - BlockIterator::owned(self.store.clone(), self.state.read().clone(), slot) - } - /// Iterates in reverse (highest to lowest slot) through all block roots from `slot` through to - /// genesis. + /// Iterates through all the `BeaconBlock` roots and slots, first returning + /// `self.head().beacon_block` then all prior blocks until either genesis or if the database + /// fails to return a prior block. /// - /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. + /// Returns duplicate roots for skip-slots. /// - /// Contains duplicate roots when skip slots are encountered. - pub fn rev_iter_block_roots(&self, slot: Slot) -> BlockRootsIterator { - BlockRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot) - } - - /// Iterates in reverse (highest to lowest slot) through all block roots from largest - /// `slot <= beacon_state.slot` through to genesis. + /// Iterator returns `(Hash256, Slot)`. /// - /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. + /// ## Note /// - /// Contains duplicate roots when skip slots are encountered. - pub fn rev_iter_best_block_roots( + /// Because this iterator starts at the `head` of the chain (viz., the best block), the first slot + /// returned may be earlier than the wall-clock slot. + pub fn rev_iter_block_roots( &self, slot: Slot, - ) -> BestBlockRootsIterator { - BestBlockRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot) + ) -> ReverseBlockRootIterator { + let state = &self.head().beacon_state; + let block_root = self.head().beacon_block_root; + let block_slot = state.slot; + + let iter = BlockRootsIterator::owned(self.store.clone(), state.clone(), slot); + + ReverseBlockRootIterator::new((block_root, block_slot), iter) } - /// Iterates in reverse (highest to lowest slot) through all state roots from `slot` through to - /// genesis. + /// Iterates through all the `BeaconState` roots and slots, first returning + /// `self.head().beacon_state` then all prior states until either genesis or if the database + /// fails to return a prior state. /// - /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. - pub fn rev_iter_state_roots(&self, slot: Slot) -> StateRootsIterator { - StateRootsIterator::owned(self.store.clone(), self.state.read().clone(), slot) + /// Iterator returns `(Hash256, Slot)`. + /// + /// ## Note + /// + /// Because this iterator starts at the `head` of the chain (viz., the best block), the first slot + /// returned may be earlier than the wall-clock slot. + pub fn rev_iter_state_roots( + &self, + slot: Slot, + ) -> ReverseStateRootIterator { + let state = &self.head().beacon_state; + let state_root = self.head().beacon_state_root; + let state_slot = state.slot; + + let iter = StateRootsIterator::owned(self.store.clone(), state.clone(), slot); + + ReverseStateRootIterator::new((state_root, state_slot), iter) } /// Returns the block at the given root, if any. @@ -271,8 +280,10 @@ impl BeaconChain { /// Returns a read-lock guarded `BeaconState` which is the `canonical_head` that has been /// updated to match the current slot clock. - pub fn current_state(&self) -> RwLockReadGuard> { - self.state.read() + pub fn speculative_state(&self) -> Result>, Error> { + // TODO: ensure the state has done a catch-up. + + Ok(self.state.read()) } /// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index b77979b74..74778be32 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -52,7 +52,7 @@ impl ForkChoice { // been justified for at least 1 epoch ... If no such descendant exists, // set justified_head to finalized_head. let (start_state, start_block_root, start_block_slot) = { - let state = chain.current_state(); + let state = &chain.head().beacon_state; let (block_root, block_slot) = if state.current_epoch() + 1 > state.current_justified_checkpoint.epoch { diff --git a/beacon_node/beacon_chain/src/iter.rs b/beacon_node/beacon_chain/src/iter.rs new file mode 100644 index 000000000..f73e88afa --- /dev/null +++ b/beacon_node/beacon_chain/src/iter.rs @@ -0,0 +1,48 @@ +use store::iter::{BlockRootsIterator, StateRootsIterator}; +use types::{Hash256, Slot}; + +pub type ReverseBlockRootIterator<'a, E, S> = + ReverseHashAndSlotIterator>; +pub type ReverseStateRootIterator<'a, E, S> = + ReverseHashAndSlotIterator>; + +pub type ReverseHashAndSlotIterator = ReverseChainIterator<(Hash256, Slot), I>; + +/// Provides a wrapper for an iterator that returns a given `T` before it starts returning results of +/// the `Iterator`. +pub struct ReverseChainIterator { + first_value_used: bool, + first_value: T, + iter: I, +} + +impl ReverseChainIterator +where + T: Sized, + I: Iterator + Sized, +{ + pub fn new(first_value: T, iter: I) -> Self { + Self { + first_value_used: false, + first_value, + iter, + } + } +} + +impl Iterator for ReverseChainIterator +where + T: Clone, + I: Iterator, +{ + type Item = T; + + fn next(&mut self) -> Option { + if self.first_value_used { + self.iter.next() + } else { + self.first_value_used = true; + Some(self.first_value.clone()) + } + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index df1de153a..c2efcad13 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -2,6 +2,7 @@ mod beacon_chain; mod checkpoint; mod errors; mod fork_choice; +mod iter; mod metrics; mod persisted_beacon_chain; pub mod test_utils; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 6242b8a0a..cdcd8bb21 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -198,7 +198,7 @@ where fn get_state_at_slot(&self, state_slot: Slot) -> BeaconState { let state_root = self .chain - .rev_iter_state_roots(self.chain.current_state().slot - 1) + .rev_iter_state_roots(self.chain.head().beacon_state.slot - 1) .find(|(_hash, slot)| *slot == state_slot) .map(|(hash, _slot)| hash) .expect("could not find state root"); diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index ac001415c..215e37e7f 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -266,7 +266,7 @@ impl SimpleSync { fn root_at_slot(&self, target_slot: Slot) -> Option { self.chain - .rev_iter_best_block_roots(target_slot) + .rev_iter_block_roots(target_slot) .take(1) .find(|(_root, slot)| *slot == target_slot) .map(|(root, _slot)| root) @@ -280,6 +280,8 @@ impl SimpleSync { req: BeaconBlockRootsRequest, network: &mut NetworkContext, ) { + let state = &self.chain.head().beacon_state; + debug!( self.log, "BlockRootsRequest"; @@ -290,8 +292,8 @@ impl SimpleSync { let mut roots: Vec = self .chain - .rev_iter_best_block_roots(req.start_slot + req.count) - .take(req.count as usize) + .rev_iter_block_roots(std::cmp::min(req.start_slot + req.count, state.slot)) + .take_while(|(_root, slot)| req.start_slot <= *slot) .map(|(block_root, slot)| BlockRootSlot { slot, block_root }) .collect(); @@ -302,7 +304,7 @@ impl SimpleSync { "peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested hashes", "start_slot" => req.start_slot, - "current_slot" => self.chain.current_state().slot, + "current_slot" => self.chain.present_slot(), "requested" => req.count, "returned" => roots.len(), ); @@ -389,6 +391,8 @@ impl SimpleSync { req: BeaconBlockHeadersRequest, network: &mut NetworkContext, ) { + let state = &self.chain.head().beacon_state; + debug!( self.log, "BlockHeadersRequest"; @@ -399,13 +403,10 @@ impl SimpleSync { let count = req.max_headers; // Collect the block roots. - // - // Instead of using `chain.rev_iter_blocks` we collect the roots first. This avoids - // unnecessary block deserialization when `req.skip_slots > 0`. let mut roots: Vec = self .chain - .rev_iter_best_block_roots(req.start_slot + count) - .take(count as usize) + .rev_iter_block_roots(std::cmp::min(req.start_slot + count, state.slot)) + .take_while(|(_root, slot)| req.start_slot <= *slot) .map(|(root, _slot)| root) .collect(); diff --git a/beacon_node/rest_api/src/beacon_node.rs b/beacon_node/rest_api/src/beacon_node.rs index 87d2d3cdc..bd8d98a53 100644 --- a/beacon_node/rest_api/src/beacon_node.rs +++ b/beacon_node/rest_api/src/beacon_node.rs @@ -54,7 +54,7 @@ fn get_version(_req: Request) -> APIResult { fn get_genesis_time(req: Request) -> APIResult { let beacon_chain = req.extensions().get::>>().unwrap(); let gen_time = { - let state = beacon_chain.current_state(); + let state = &beacon_chain.head().beacon_state; state.genesis_time }; let body = Body::from( diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 5ea8368fd..20425d292 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -40,7 +40,11 @@ impl AttestationService for AttestationServiceInstance { // verify the slot, drop lock on state afterwards { let slot_requested = req.get_slot(); - let state = &self.chain.current_state(); + // TODO: this whole module is legacy and not maintained well. + let state = &self + .chain + .speculative_state() + .expect("This is legacy code and should be removed"); // Start by performing some checks // Check that the AttestationData is for the current slot (otherwise it will not be valid) diff --git a/beacon_node/rpc/src/validator.rs b/beacon_node/rpc/src/validator.rs index b13303e25..080c828a7 100644 --- a/beacon_node/rpc/src/validator.rs +++ b/beacon_node/rpc/src/validator.rs @@ -29,7 +29,11 @@ impl ValidatorService for ValidatorServiceInstance { trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); let spec = &self.chain.spec; - let state = &self.chain.current_state(); + // TODO: this whole module is legacy and not maintained well. + let state = &self + .chain + .speculative_state() + .expect("This is legacy code and should be removed"); let epoch = Epoch::from(req.get_epoch()); let mut resp = GetDutiesResponse::new(); let resp_validators = resp.mut_active_validators(); diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 55c525b11..fc5d80679 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -4,20 +4,23 @@ use std::sync::Arc; use types::{BeaconBlock, BeaconState, BeaconStateError, EthSpec, Hash256, Slot}; /// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over. +/// +/// ## Note +/// +/// It is assumed that all ancestors for this object are stored in the database. If this is not the +/// case, the iterator will start returning `None` prior to genesis. pub trait AncestorIter { /// Returns an iterator over the roots of the ancestors of `self`. fn try_iter_ancestor_roots(&self, store: Arc) -> Option; } -impl<'a, U: Store, E: EthSpec> AncestorIter> - for BeaconBlock -{ +impl<'a, U: Store, E: EthSpec> AncestorIter> for BeaconBlock { /// Iterates across all the prior block roots of `self`, starting at the most recent and ending /// at genesis. - fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { + fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { let state = store.get::>(&self.state_root).ok()??; - Some(BestBlockRootsIterator::owned(store, state, self.slot)) + Some(BlockRootsIterator::owned(store, state, self.slot)) } } @@ -116,11 +119,6 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { /// exhausted. /// /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. -/// -/// ## Notes -/// -/// See [`BestBlockRootsIterator`](struct.BestBlockRootsIterator.html), which has different -/// `start_slot` logic. #[derive(Clone)] pub struct BlockRootsIterator<'a, T: EthSpec, U> { store: Arc, @@ -180,104 +178,6 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { } } -/// Iterates backwards through block roots with `start_slot` highest possible value -/// `<= beacon_state.slot`. -/// -/// The distinction between `BestBlockRootsIterator` and `BlockRootsIterator` is: -/// -/// - `BestBlockRootsIterator` uses best-effort slot. When `start_slot` is greater than the latest available block root -/// on `beacon_state`, returns `Some(root, slot)` where `slot` is the latest available block -/// root. -/// - `BlockRootsIterator` is strict about `start_slot`. When `start_slot` is greater than the latest available block root -/// on `beacon_state`, returns `None`. -/// -/// This is distinct from `BestBlockRootsIterator`. -/// -/// Uses the `block_roots` field of `BeaconState` to as the source of block roots and will -/// perform a lookup on the `Store` for a prior `BeaconState` if `block_roots` has been -/// exhausted. -/// -/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. -#[derive(Clone)] -pub struct BestBlockRootsIterator<'a, T: EthSpec, U> { - store: Arc, - beacon_state: Cow<'a, BeaconState>, - slot: Slot, -} - -impl<'a, T: EthSpec, U: Store> BestBlockRootsIterator<'a, T, U> { - /// Create a new iterator over all block roots in the given `beacon_state` and prior states. - pub fn new(store: Arc, beacon_state: &'a BeaconState, start_slot: Slot) -> Self { - let mut slot = start_slot; - if slot >= beacon_state.slot { - // Slot may be too high. - slot = beacon_state.slot; - if beacon_state.get_block_root(slot).is_err() { - slot -= 1; - } - } - - Self { - store, - beacon_state: Cow::Borrowed(beacon_state), - slot: slot + 1, - } - } - - /// Create a new iterator over all block roots in the given `beacon_state` and prior states. - pub fn owned(store: Arc, beacon_state: BeaconState, start_slot: Slot) -> Self { - let mut slot = start_slot; - if slot >= beacon_state.slot { - // Slot may be too high. - slot = beacon_state.slot; - // TODO: Use a function other than `get_block_root` as this will always return `Err()` - // for slot = state.slot. - if beacon_state.get_block_root(slot).is_err() { - slot -= 1; - } - } - - Self { - store, - beacon_state: Cow::Owned(beacon_state), - slot: slot + 1, - } - } -} - -impl<'a, T: EthSpec, U: Store> Iterator for BestBlockRootsIterator<'a, T, U> { - type Item = (Hash256, Slot); - - fn next(&mut self) -> Option { - if self.slot == 0 { - // End of Iterator - return None; - } - - self.slot -= 1; - - match self.beacon_state.get_block_root(self.slot) { - Ok(root) => Some((*root, self.slot)), - Err(BeaconStateError::SlotOutOfBounds) => { - // Read a `BeaconState` from the store that has access to prior historical root. - let beacon_state: BeaconState = { - // Load the earliest state from disk. - let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; - - self.store.get(&new_state_root).ok()? - }?; - - self.beacon_state = Cow::Owned(beacon_state); - - let root = self.beacon_state.get_block_root(self.slot).ok()?; - - Some((*root, self.slot)) - } - _ => None, - } - } -} - #[cfg(test)] mod test { use super::*; @@ -337,49 +237,6 @@ mod test { } } - #[test] - fn best_block_root_iter() { - let store = Arc::new(MemoryStore::open()); - let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root(); - - let mut state_a: BeaconState = get_state(); - let mut state_b: BeaconState = get_state(); - - state_a.slot = Slot::from(slots_per_historical_root); - state_b.slot = Slot::from(slots_per_historical_root * 2); - - let mut hashes = (0..).into_iter().map(|i| Hash256::from(i)); - - for root in &mut state_a.block_roots[..] { - *root = hashes.next().unwrap() - } - for root in &mut state_b.block_roots[..] { - *root = hashes.next().unwrap() - } - - let state_a_root = hashes.next().unwrap(); - state_b.state_roots[0] = state_a_root; - store.put(&state_a_root, &state_a).unwrap(); - - let iter = BestBlockRootsIterator::new(store.clone(), &state_b, state_b.slot); - - assert!( - iter.clone().find(|(_root, slot)| *slot == 0).is_some(), - "iter should contain zero slot" - ); - - let mut collected: Vec<(Hash256, Slot)> = iter.collect(); - collected.reverse(); - - let expected_len = 2 * MainnetEthSpec::slots_per_historical_root(); - - assert_eq!(collected.len(), expected_len); - - for i in 0..expected_len { - assert_eq!(collected[i].0, Hash256::from(i as u64)); - } - } - #[test] fn state_root_iter() { let store = Arc::new(MemoryStore::open()); diff --git a/eth2/lmd_ghost/tests/test.rs b/eth2/lmd_ghost/tests/test.rs index fbe385560..0ac263638 100644 --- a/eth2/lmd_ghost/tests/test.rs +++ b/eth2/lmd_ghost/tests/test.rs @@ -10,7 +10,7 @@ use lmd_ghost::{LmdGhost, ThreadSafeReducedTree as BaseThreadSafeReducedTree}; use rand::{prelude::*, rngs::StdRng}; use std::sync::Arc; use store::{ - iter::{AncestorIter, BestBlockRootsIterator}, + iter::{AncestorIter, BlockRootsIterator}, MemoryStore, Store, }; use types::{BeaconBlock, EthSpec, Hash256, MinimalEthSpec, Slot}; @@ -159,7 +159,7 @@ fn get_ancestor_roots( .expect("block should exist") .expect("store should not error"); - as AncestorIter<_, BestBlockRootsIterator>>::try_iter_ancestor_roots( + as AncestorIter<_, BlockRootsIterator>>::try_iter_ancestor_roots( &block, store, ) .expect("should be able to create ancestor iter")