Add error handling to iterators (#1243)

* Add error handling to iterators

* Review feedback

* Leverage itertools::process_results() in few places
This commit is contained in:
Adam Szkoda 2020-06-10 01:55:44 +02:00 committed by GitHub
parent ed4b3ef471
commit 7f036a6e95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 193 additions and 119 deletions

3
Cargo.lock generated
View File

@ -289,6 +289,7 @@ dependencies = [
"futures 0.3.5", "futures 0.3.5",
"genesis", "genesis",
"integer-sqrt", "integer-sqrt",
"itertools 0.9.0",
"lazy_static", "lazy_static",
"lighthouse_metrics", "lighthouse_metrics",
"log 0.4.8", "log 0.4.8",
@ -2994,6 +2995,7 @@ dependencies = [
"genesis", "genesis",
"hashset_delay", "hashset_delay",
"hex 0.4.2", "hex 0.4.2",
"itertools 0.9.0",
"lazy_static", "lazy_static",
"lighthouse_metrics", "lighthouse_metrics",
"matches", "matches",
@ -3871,6 +3873,7 @@ dependencies = [
"hex 0.4.2", "hex 0.4.2",
"http 0.2.1", "http 0.2.1",
"hyper 0.13.6", "hyper 0.13.6",
"itertools 0.9.0",
"lazy_static", "lazy_static",
"lighthouse_metrics", "lighthouse_metrics",
"network", "network",

View File

@ -48,7 +48,7 @@ bls = { path = "../../crypto/bls" }
safe_arith = { path = "../../consensus/safe_arith" } safe_arith = { path = "../../consensus/safe_arith" }
environment = { path = "../../lighthouse/environment" } environment = { path = "../../lighthouse/environment" }
bus = "2.2.3" bus = "2.2.3"
itertools = "0.9.0"
[dev-dependencies] [dev-dependencies]
lazy_static = "1.4.0" lazy_static = "1.4.0"

View File

@ -23,6 +23,7 @@ use crate::snapshot_cache::SnapshotCache;
use crate::timeout_rw_lock::TimeoutRwLock; use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconSnapshot; use crate::BeaconSnapshot;
use itertools::process_results;
use operation_pool::{OperationPool, PersistedOperationPool}; use operation_pool::{OperationPool, PersistedOperationPool};
use slog::{crit, debug, error, info, trace, warn, Logger}; use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@ -319,12 +320,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// - Iterator returns `(Hash256, Slot)`. /// - Iterator returns `(Hash256, Slot)`.
/// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot /// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot
/// returned may be earlier than the wall-clock slot. /// returned may be earlier than the wall-clock slot.
pub fn rev_iter_block_roots(&self) -> Result<impl Iterator<Item = (Hash256, Slot)>, Error> { pub fn rev_iter_block_roots(
&self,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let head = self.head()?; let head = self.head()?;
let iter = BlockRootsIterator::owned(self.store.clone(), head.beacon_state); let iter = BlockRootsIterator::owned(self.store.clone(), head.beacon_state);
Ok(
Ok(std::iter::once((head.beacon_block_root, head.beacon_block.slot())).chain(iter)) std::iter::once(Ok((head.beacon_block_root, head.beacon_block.slot())))
.chain(iter)
.map(|result| result.map_err(|e| e.into())),
)
} }
pub fn forwards_iter_block_roots( pub fn forwards_iter_block_roots(
@ -339,7 +344,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
local_head.beacon_state, local_head.beacon_state,
local_head.beacon_block_root, local_head.beacon_block_root,
&self.spec, &self.spec,
)) )?)
} }
/// Traverse backwards from `block_root` to find the block roots of its ancestors. /// Traverse backwards from `block_root` to find the block roots of its ancestors.
@ -354,7 +359,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn rev_iter_block_roots_from( pub fn rev_iter_block_roots_from(
&self, &self,
block_root: Hash256, block_root: Hash256,
) -> Result<impl Iterator<Item = (Hash256, Slot)>, Error> { ) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let block = self let block = self
.get_block(&block_root)? .get_block(&block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(block_root))?; .ok_or_else(|| Error::MissingBeaconBlock(block_root))?;
@ -362,7 +367,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_state(&block.state_root(), Some(block.slot()))? .get_state(&block.state_root(), Some(block.slot()))?
.ok_or_else(|| Error::MissingBeaconState(block.state_root()))?; .ok_or_else(|| Error::MissingBeaconState(block.state_root()))?;
let iter = BlockRootsIterator::owned(self.store.clone(), state); let iter = BlockRootsIterator::owned(self.store.clone(), state);
Ok(std::iter::once((block_root, block.slot())).chain(iter)) Ok(std::iter::once(Ok((block_root, block.slot())))
.chain(iter)
.map(|result| result.map_err(|e| e.into())))
} }
/// Traverse backwards from `block_root` to find the root of the ancestor block at `slot`. /// Traverse backwards from `block_root` to find the root of the ancestor block at `slot`.
@ -371,10 +378,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256, block_root: Hash256,
slot: Slot, slot: Slot,
) -> Result<Option<Hash256>, Error> { ) -> Result<Option<Hash256>, Error> {
Ok(self process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| {
.rev_iter_block_roots_from(block_root)? iter.find(|(_, ancestor_slot)| *ancestor_slot == slot)
.find(|(_, ancestor_slot)| *ancestor_slot == slot) .map(|(ancestor_block_root, _)| ancestor_block_root)
.map(|(ancestor_block_root, _)| ancestor_block_root)) })
} }
/// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to /// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to
@ -386,13 +393,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// - Iterator returns `(Hash256, Slot)`. /// - Iterator returns `(Hash256, Slot)`.
/// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot /// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot
/// returned may be earlier than the wall-clock slot. /// returned may be earlier than the wall-clock slot.
pub fn rev_iter_state_roots(&self) -> Result<impl Iterator<Item = (Hash256, Slot)>, Error> { pub fn rev_iter_state_roots(
&self,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let head = self.head()?; let head = self.head()?;
let slot = head.beacon_state.slot; let slot = head.beacon_state.slot;
let iter = StateRootsIterator::owned(self.store.clone(), head.beacon_state); let iter = StateRootsIterator::owned(self.store.clone(), head.beacon_state);
let iter = std::iter::once(Ok((head.beacon_state_root, slot)))
Ok(std::iter::once((head.beacon_state_root, slot)).chain(iter)) .chain(iter)
.map(|result| result.map_err(Into::into));
Ok(iter)
} }
/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain. /// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
@ -404,10 +414,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self, &self,
slot: Slot, slot: Slot,
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> { ) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
let root = self let root = process_results(self.rev_iter_block_roots()?, |mut iter| {
.rev_iter_block_roots()? iter.find(|(_, this_slot)| *this_slot == slot)
.find(|(_, this_slot)| *this_slot == slot) .map(|(root, _)| root)
.map(|(root, _)| root); })?;
if let Some(block_root) = root { if let Some(block_root) = root {
Ok(self.store.get_item(&block_root)?) Ok(self.store.get_item(&block_root)?)
@ -553,12 +563,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(state) Ok(state)
} }
Ordering::Less => { Ordering::Less => {
let state_root = self let state_root = process_results(self.rev_iter_state_roots()?, |iter| {
.rev_iter_state_roots()? iter.take_while(|(_, current_slot)| *current_slot >= slot)
.take_while(|(_root, current_slot)| *current_slot >= slot) .find(|(_, current_slot)| *current_slot == slot)
.find(|(_root, current_slot)| *current_slot == slot) .map(|(root, _slot)| root)
.map(|(root, _slot)| root) })?
.ok_or_else(|| Error::NoStateForSlot(slot))?; .ok_or_else(|| Error::NoStateForSlot(slot))?;
Ok(self Ok(self
.get_state(&state_root, Some(slot))? .get_state(&state_root, Some(slot))?
@ -633,10 +643,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ///
/// Returns None if a block doesn't exist at the slot. /// Returns None if a block doesn't exist at the slot.
pub fn root_at_slot(&self, target_slot: Slot) -> Result<Option<Hash256>, Error> { pub fn root_at_slot(&self, target_slot: Slot) -> Result<Option<Hash256>, Error> {
Ok(self process_results(self.rev_iter_block_roots()?, |mut iter| {
.rev_iter_block_roots()? iter.find(|(_, slot)| *slot == target_slot)
.find(|(_root, slot)| *slot == target_slot) .map(|(root, _)| root)
.map(|(root, _slot)| root)) })
} }
/// Returns the block proposer for a given slot. /// Returns the block proposer for a given slot.

View File

@ -84,9 +84,10 @@ pub trait Migrate<E: EthSpec>: Send + Sync + 'static {
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(head_hash.into()))? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(head_hash.into()))?
.state_root(); .state_root();
let iterator = std::iter::once((head_hash, head_state_hash, head_slot)) let iter = std::iter::once(Ok((head_hash, head_state_hash, head_slot)))
.chain(RootsIterator::from_block(Arc::clone(&store), head_hash)?); .chain(RootsIterator::from_block(Arc::clone(&store), head_hash)?);
for (block_hash, state_hash, slot) in iterator { for maybe_tuple in iter {
let (block_hash, state_hash, slot) = maybe_tuple?;
if slot < old_finalized_slot { if slot < old_finalized_slot {
// We must assume here any candidate chains include old_finalized_block_hash, // We must assume here any candidate chains include old_finalized_block_hash,
// i.e. there aren't any forks starting at a block that is a strict ancestor of // i.e. there aren't any forks starting at a block that is a strict ancestor of

View File

@ -392,6 +392,7 @@ fn delete_blocks_and_states() {
.expect("faulty head state exists"); .expect("faulty head state exists");
let states_to_delete = StateRootsIterator::new(store.clone(), &faulty_head_state) let states_to_delete = StateRootsIterator::new(store.clone(), &faulty_head_state)
.map(Result::unwrap)
.take_while(|(_, slot)| *slot > unforked_blocks) .take_while(|(_, slot)| *slot > unforked_blocks)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -409,6 +410,7 @@ fn delete_blocks_and_states() {
// Deleting the blocks from the fork should remove them completely // Deleting the blocks from the fork should remove them completely
let blocks_to_delete = BlockRootsIterator::new(store.clone(), &faulty_head_state) let blocks_to_delete = BlockRootsIterator::new(store.clone(), &faulty_head_state)
.map(Result::unwrap)
// Extra +1 here accounts for the skipped slot that started this fork // Extra +1 here accounts for the skipped slot that started this fork
.take_while(|(_, slot)| *slot > unforked_blocks + 1) .take_while(|(_, slot)| *slot > unforked_blocks + 1)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -424,6 +426,7 @@ fn delete_blocks_and_states() {
.chain .chain
.rev_iter_state_roots() .rev_iter_state_roots()
.expect("rev iter ok") .expect("rev iter ok")
.map(Result::unwrap)
.filter(|(_, slot)| *slot < split_slot); .filter(|(_, slot)| *slot < split_slot);
for (state_root, slot) in finalized_states { for (state_root, slot) in finalized_states {
@ -659,11 +662,12 @@ fn check_shuffling_compatible(
let previous_pivot_slot = let previous_pivot_slot =
(head_state.previous_epoch() - shuffling_lookahead).end_slot(E::slots_per_epoch()); (head_state.previous_epoch() - shuffling_lookahead).end_slot(E::slots_per_epoch());
for (block_root, slot) in harness for maybe_tuple in harness
.chain .chain
.rev_iter_block_roots_from(head_block_root) .rev_iter_block_roots_from(head_block_root)
.unwrap() .unwrap()
{ {
let (block_root, slot) = maybe_tuple.unwrap();
// Shuffling is compatible targeting the current epoch, // Shuffling is compatible targeting the current epoch,
// iff slot is greater than or equal to the current epoch pivot block // iff slot is greater than or equal to the current epoch pivot block
assert_eq!( assert_eq!(
@ -1364,6 +1368,8 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
head.beacon_block_root, head.beacon_block_root,
&harness.spec, &harness.spec,
) )
.unwrap()
.map(Result::unwrap)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Drop the block roots for skipped slots. // Drop the block roots for skipped slots.
@ -1387,6 +1393,7 @@ fn check_iterators(harness: &TestHarness) {
.rev_iter_state_roots() .rev_iter_state_roots()
.expect("should get iter") .expect("should get iter")
.last() .last()
.map(Result::unwrap)
.map(|(_, slot)| slot), .map(|(_, slot)| slot),
Some(Slot::new(0)) Some(Slot::new(0))
); );
@ -1396,6 +1403,7 @@ fn check_iterators(harness: &TestHarness) {
.rev_iter_block_roots() .rev_iter_block_roots()
.expect("should get iter") .expect("should get iter")
.last() .last()
.map(Result::unwrap)
.map(|(_, slot)| slot), .map(|(_, slot)| slot),
Some(Slot::new(0)) Some(Slot::new(0))
); );

View File

@ -73,11 +73,13 @@ fn iterators() {
.chain .chain
.rev_iter_block_roots() .rev_iter_block_roots()
.expect("should get iter") .expect("should get iter")
.map(Result::unwrap)
.collect(); .collect();
let state_roots: Vec<(Hash256, Slot)> = harness let state_roots: Vec<(Hash256, Slot)> = harness
.chain .chain
.rev_iter_state_roots() .rev_iter_state_roots()
.expect("should get iter") .expect("should get iter")
.map(Result::unwrap)
.collect(); .collect();
assert_eq!( assert_eq!(

View File

@ -37,3 +37,4 @@ rlp = "0.4.5"
lazy_static = "1.4.0" lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
environment = { path = "../../lighthouse/environment" } environment = { path = "../../lighthouse/environment" }
itertools = "0.9.0"

View File

@ -9,6 +9,7 @@ use beacon_chain::{
}; };
use eth2_libp2p::rpc::*; use eth2_libp2p::rpc::*;
use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response}; use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response};
use itertools::process_results;
use slog::{debug, error, o, trace, warn}; use slog::{debug, error, o, trace, warn};
use ssz::Encode; use ssz::Encode;
use std::sync::Arc; use std::sync::Arc;
@ -357,20 +358,29 @@ impl<T: BeaconChainTypes> Processor<T> {
// pick out the required blocks, ignoring skip-slots and stepping by the step parameter; // pick out the required blocks, ignoring skip-slots and stepping by the step parameter;
let mut last_block_root = None; let mut last_block_root = None;
let block_roots = forwards_block_root_iter let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
.take_while(|(_root, slot)| slot.as_u64() < req.start_slot + req.count * req.step) iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot + req.count * req.step)
// map skip slots to None // map skip slots to None
.map(|(root, _slot)| { .map(|(root, _)| {
let result = if Some(root) == last_block_root { let result = if Some(root) == last_block_root {
None None
} else { } else {
Some(root) Some(root)
}; };
last_block_root = Some(root); last_block_root = Some(root);
result result
}) })
.step_by(req.step as usize) .step_by(req.step as usize)
.collect::<Vec<_>>(); .collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks"; "error" => format!("{:?}", e));
return;
}
};
// remove all skip slots // remove all skip slots
let block_roots = block_roots let block_roots = block_roots

View File

@ -39,6 +39,7 @@ rayon = "1.3.0"
environment = { path = "../../lighthouse/environment" } environment = { path = "../../lighthouse/environment" }
uhttp_sse = "0.5.1" uhttp_sse = "0.5.1"
bus = "2.2.3" bus = "2.2.3"
itertools = "0.9.0"
[dev-dependencies] [dev-dependencies]
assert_matches = "1.3.0" assert_matches = "1.3.0"

View File

@ -5,6 +5,7 @@ use eth2_libp2p::PubsubMessage;
use hex; use hex;
use http::header; use http::header;
use hyper::{Body, Request}; use hyper::{Body, Request};
use itertools::process_results;
use network::NetworkMessage; use network::NetworkMessage;
use ssz::Decode; use ssz::Decode;
use store::{iter::AncestorIter, Store}; use store::{iter::AncestorIter, Store};
@ -118,11 +119,14 @@ pub fn block_root_at_slot<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>, beacon_chain: &BeaconChain<T>,
target: Slot, target: Slot,
) -> Result<Option<Hash256>, ApiError> { ) -> Result<Option<Hash256>, ApiError> {
Ok(beacon_chain Ok(process_results(
.rev_iter_block_roots()? beacon_chain.rev_iter_block_roots()?,
.take_while(|(_root, slot)| *slot >= target) |iter| {
.find(|(_root, slot)| *slot == target) iter.take_while(|(_, slot)| *slot >= target)
.map(|(root, _slot)| root)) .find(|(_, slot)| *slot == target)
.map(|(root, _)| root)
},
)?)
} }
/// Returns a `BeaconState` and it's root in the canonical chain of `beacon_chain` at the given /// Returns a `BeaconState` and it's root in the canonical chain of `beacon_chain` at the given
@ -190,12 +194,15 @@ pub fn state_root_at_slot<T: BeaconChainTypes>(
// //
// Iterate through the state roots on the head state to find the root for that // Iterate through the state roots on the head state to find the root for that
// slot. Once the root is found, load it from the database. // slot. Once the root is found, load it from the database.
Ok(head_state process_results(
.try_iter_ancestor_roots(beacon_chain.store.clone()) head_state
.ok_or_else(|| ApiError::ServerError("Failed to create roots iterator".to_string()))? .try_iter_ancestor_roots(beacon_chain.store.clone())
.find(|(_root, s)| *s == slot) .ok_or_else(|| {
.map(|(root, _slot)| root) ApiError::ServerError("Failed to create roots iterator".to_string())
.ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot)))?) })?,
|mut iter| iter.find(|(_, s)| *s == slot).map(|(root, _)| root),
)?
.ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot)))
} else { } else {
// 4. The request slot is later than the head slot. // 4. The request slot is later than the head slot.
// //

View File

@ -759,6 +759,7 @@ fn get_genesis_state_root() {
.expect("should have beacon chain") .expect("should have beacon chain")
.rev_iter_state_roots() .rev_iter_state_roots()
.expect("should get iter") .expect("should get iter")
.map(Result::unwrap)
.find(|(_cur_root, cur_slot)| slot == *cur_slot) .find(|(_cur_root, cur_slot)| slot == *cur_slot)
.map(|(cur_root, _)| cur_root) .map(|(cur_root, _)| cur_root)
.expect("chain should have state root at slot"); .expect("chain should have state root at slot");
@ -786,6 +787,7 @@ fn get_genesis_block_root() {
.expect("should have beacon chain") .expect("should have beacon chain")
.rev_iter_block_roots() .rev_iter_block_roots()
.expect("should get iter") .expect("should get iter")
.map(Result::unwrap)
.find(|(_cur_root, cur_slot)| slot == *cur_slot) .find(|(_cur_root, cur_slot)| slot == *cur_slot)
.map(|(cur_root, _)| cur_root) .map(|(cur_root, _)| cur_root)
.expect("chain should have state root at slot"); .expect("chain should have state root at slot");

View File

@ -3,6 +3,8 @@ use crate::hot_cold_store::HotColdDBError;
use ssz::DecodeError; use ssz::DecodeError;
use types::{BeaconStateError, Hash256}; use types::{BeaconStateError, Hash256};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
SszDecodeError(DecodeError), SszDecodeError(DecodeError),
@ -13,6 +15,7 @@ pub enum Error {
DBError { message: String }, DBError { message: String },
RlpError(String), RlpError(String),
BlockNotFound(Hash256), BlockNotFound(Hash256),
NoContinuationData,
} }
impl From<DecodeError> for Error { impl From<DecodeError> for Error {

View File

@ -1,8 +1,9 @@
use crate::chunked_iter::ChunkedVectorIter; use crate::chunked_iter::ChunkedVectorIter;
use crate::chunked_vector::BlockRoots; use crate::chunked_vector::BlockRoots;
use crate::errors::{Error, Result};
use crate::iter::BlockRootsIterator; use crate::iter::BlockRootsIterator;
use crate::{HotColdDB, Store}; use crate::{HotColdDB, Store};
use slog::error; use itertools::process_results;
use std::sync::Arc; use std::sync::Arc;
use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot}; use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot};
@ -63,22 +64,26 @@ impl SimpleForwardsBlockRootsIterator {
start_slot: Slot, start_slot: Slot,
end_state: BeaconState<E>, end_state: BeaconState<E>,
end_block_root: Hash256, end_block_root: Hash256,
) -> Self { ) -> Result<Self> {
// Iterate backwards from the end state, stopping at the start slot. // Iterate backwards from the end state, stopping at the start slot.
let iter = std::iter::once((end_block_root, end_state.slot)) let values = process_results(
.chain(BlockRootsIterator::owned(store, end_state)); std::iter::once(Ok((end_block_root, end_state.slot)))
Self { .chain(BlockRootsIterator::owned(store, end_state)),
values: iter.take_while(|(_, slot)| *slot >= start_slot).collect(), |iter| {
} iter.take_while(|(_, slot)| *slot >= start_slot)
.collect::<Vec<_>>()
},
)?;
Ok(Self { values: values })
} }
} }
impl Iterator for SimpleForwardsBlockRootsIterator { impl Iterator for SimpleForwardsBlockRootsIterator {
type Item = (Hash256, Slot); type Item = Result<(Hash256, Slot)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
// Pop from the end of the vector to get the block roots in slot-ascending order. // Pop from the end of the vector to get the block roots in slot-ascending order.
self.values.pop() Ok(self.values.pop()).transpose()
} }
} }
@ -89,12 +94,12 @@ impl<E: EthSpec> HybridForwardsBlockRootsIterator<E> {
end_state: BeaconState<E>, end_state: BeaconState<E>,
end_block_root: Hash256, end_block_root: Hash256,
spec: &ChainSpec, spec: &ChainSpec,
) -> Self { ) -> Result<Self> {
use HybridForwardsBlockRootsIterator::*; use HybridForwardsBlockRootsIterator::*;
let latest_restore_point_slot = store.get_latest_restore_point_slot(); let latest_restore_point_slot = store.get_latest_restore_point_slot();
if start_slot < latest_restore_point_slot { let result = if start_slot < latest_restore_point_slot {
PreFinalization { PreFinalization {
iter: Box::new(FrozenForwardsBlockRootsIterator::new( iter: Box::new(FrozenForwardsBlockRootsIterator::new(
store, store,
@ -111,16 +116,14 @@ impl<E: EthSpec> HybridForwardsBlockRootsIterator<E> {
start_slot, start_slot,
end_state, end_state,
end_block_root, end_block_root,
), )?,
} }
} };
Ok(result)
} }
}
impl<E: EthSpec> Iterator for HybridForwardsBlockRootsIterator<E> { fn do_next(&mut self) -> Result<Option<(Hash256, Slot)>> {
type Item = (Hash256, Slot);
fn next(&mut self) -> Option<Self::Item> {
use HybridForwardsBlockRootsIterator::*; use HybridForwardsBlockRootsIterator::*;
match self { match self {
@ -129,19 +132,13 @@ impl<E: EthSpec> Iterator for HybridForwardsBlockRootsIterator<E> {
continuation_data, continuation_data,
} => { } => {
match iter.next() { match iter.next() {
Some(x) => Some(x), Some(x) => Ok(Some(x)),
// Once the pre-finalization iterator is consumed, transition // Once the pre-finalization iterator is consumed, transition
// to a post-finalization iterator beginning from the last slot // to a post-finalization iterator beginning from the last slot
// of the pre iterator. // of the pre iterator.
None => { None => {
let (end_state, end_block_root) = let (end_state, end_block_root) =
continuation_data.take().or_else(|| { continuation_data.take().ok_or(Error::NoContinuationData)?;
error!(
iter.inner.store.log,
"HybridForwardsBlockRootsIterator: logic error"
);
None
})?;
*self = PostFinalization { *self = PostFinalization {
iter: SimpleForwardsBlockRootsIterator::new( iter: SimpleForwardsBlockRootsIterator::new(
@ -149,13 +146,21 @@ impl<E: EthSpec> Iterator for HybridForwardsBlockRootsIterator<E> {
Slot::from(iter.inner.end_vindex), Slot::from(iter.inner.end_vindex),
end_state, end_state,
end_block_root, end_block_root,
), )?,
}; };
self.next() self.do_next()
} }
} }
} }
PostFinalization { iter } => iter.next(), PostFinalization { iter } => iter.next().transpose(),
} }
} }
} }
impl<E: EthSpec> Iterator for HybridForwardsBlockRootsIterator<E> {
type Item = Result<(Hash256, Slot)>;
fn next(&mut self) -> Option<Self::Item> {
self.do_next().transpose()
}
}

View File

@ -190,7 +190,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
end_state: BeaconState<E>, end_state: BeaconState<E>,
end_block_root: Hash256, end_block_root: Hash256,
spec: &ChainSpec, spec: &ChainSpec,
) -> Self::ForwardsBlockRootsIterator { ) -> Result<Self::ForwardsBlockRootsIterator, Error> {
HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec) HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec)
} }
@ -708,7 +708,11 @@ pub fn process_finalization<E: EthSpec>(
let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head); let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head);
let mut to_delete = vec![]; let mut to_delete = vec![];
for (state_root, slot) in state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) { for maybe_pair in state_root_iter.take_while(|result| match result {
Ok((_, slot)) => slot >= &current_split_slot,
Err(_) => true,
}) {
let (state_root, slot) = maybe_pair?;
if slot % store.config.slots_per_restore_point == 0 { if slot % store.config.slots_per_restore_point == 0 {
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root)? let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root)?
.ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?; .ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?;

View File

@ -69,12 +69,12 @@ impl<'a, T: EthSpec, U: Store<T>> StateRootsIterator<'a, T, U> {
} }
impl<'a, T: EthSpec, U: Store<T>> Iterator for StateRootsIterator<'a, T, U> { impl<'a, T: EthSpec, U: Store<T>> Iterator for StateRootsIterator<'a, T, U> {
type Item = (Hash256, Slot); type Item = Result<(Hash256, Slot), Error>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
self.inner self.inner
.next() .next()
.map(|(_, state_root, slot)| (state_root, slot)) .map(|result| result.map(|(_, state_root, slot)| (state_root, slot)))
} }
} }
@ -115,12 +115,12 @@ impl<'a, T: EthSpec, U: Store<T>> BlockRootsIterator<'a, T, U> {
} }
impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockRootsIterator<'a, T, U> { impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockRootsIterator<'a, T, U> {
type Item = (Hash256, Slot); type Item = Result<(Hash256, Slot), Error>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
self.inner self.inner
.next() .next()
.map(|(block_root, _, slot)| (block_root, slot)) .map(|result| result.map(|(block_root, _, slot)| (block_root, slot)))
} }
} }
@ -167,15 +167,10 @@ impl<'a, T: EthSpec, U: Store<T>> RootsIterator<'a, T, U> {
.ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?; .ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?;
Ok(Self::owned(store, state)) Ok(Self::owned(store, state))
} }
}
impl<'a, T: EthSpec, U: Store<T>> Iterator for RootsIterator<'a, T, U> { fn do_next(&mut self) -> Result<Option<(Hash256, Hash256, Slot)>, Error> {
/// (block_root, state_root, slot)
type Item = (Hash256, Hash256, Slot);
fn next(&mut self) -> Option<Self::Item> {
if self.slot == 0 || self.slot > self.beacon_state.slot { if self.slot == 0 || self.slot > self.beacon_state.slot {
return None; return Ok(None);
} }
self.slot -= 1; self.slot -= 1;
@ -184,7 +179,7 @@ impl<'a, T: EthSpec, U: Store<T>> Iterator for RootsIterator<'a, T, U> {
self.beacon_state.get_block_root(self.slot), self.beacon_state.get_block_root(self.slot),
self.beacon_state.get_state_root(self.slot), self.beacon_state.get_state_root(self.slot),
) { ) {
(Ok(block_root), Ok(state_root)) => Some((*block_root, *state_root, self.slot)), (Ok(block_root), Ok(state_root)) => Ok(Some((*block_root, *state_root, self.slot))),
(Err(BeaconStateError::SlotOutOfBounds), Err(BeaconStateError::SlotOutOfBounds)) => { (Err(BeaconStateError::SlotOutOfBounds), Err(BeaconStateError::SlotOutOfBounds)) => {
// Read a `BeaconState` from the store that has access to prior historical roots. // Read a `BeaconState` from the store that has access to prior historical roots.
let beacon_state = let beacon_state =
@ -192,16 +187,26 @@ impl<'a, T: EthSpec, U: Store<T>> Iterator for RootsIterator<'a, T, U> {
self.beacon_state = Cow::Owned(beacon_state); self.beacon_state = Cow::Owned(beacon_state);
let block_root = *self.beacon_state.get_block_root(self.slot).ok()?; let block_root = *self.beacon_state.get_block_root(self.slot)?;
let state_root = *self.beacon_state.get_state_root(self.slot).ok()?; let state_root = *self.beacon_state.get_state_root(self.slot)?;
Some((block_root, state_root, self.slot)) Ok(Some((block_root, state_root, self.slot)))
} }
_ => None, (Err(e), _) => Err(e.into()),
(Ok(_), Err(e)) => Err(e.into()),
} }
} }
} }
impl<'a, T: EthSpec, U: Store<T>> Iterator for RootsIterator<'a, T, U> {
/// (block_root, state_root, slot)
type Item = Result<(Hash256, Hash256, Slot), Error>;
fn next(&mut self) -> Option<Self::Item> {
self.do_next().transpose()
}
}
/// Block iterator that uses the `parent_root` of each block to backtrack. /// Block iterator that uses the `parent_root` of each block to backtrack.
pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store<E>> { pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store<E>> {
store: &'a S, store: &'a S,
@ -263,14 +268,22 @@ impl<'a, T: EthSpec, U: Store<T>> BlockIterator<'a, T, U> {
roots: BlockRootsIterator::owned(store, beacon_state), roots: BlockRootsIterator::owned(store, beacon_state),
} }
} }
fn do_next(&mut self) -> Result<Option<SignedBeaconBlock<T>>, Error> {
if let Some(result) = self.roots.next() {
let (root, _slot) = result?;
self.roots.inner.store.get_block(&root)
} else {
Ok(None)
}
}
} }
impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockIterator<'a, T, U> { impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockIterator<'a, T, U> {
type Item = SignedBeaconBlock<T>; type Item = Result<SignedBeaconBlock<T>, Error>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
let (root, _slot) = self.roots.next()?; self.do_next().transpose()
self.roots.inner.store.get_block(&root).ok()?
} }
} }
@ -278,14 +291,16 @@ impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockIterator<'a, T, U> {
fn next_historical_root_backtrack_state<E: EthSpec, S: Store<E>>( fn next_historical_root_backtrack_state<E: EthSpec, S: Store<E>>(
store: &S, store: &S,
current_state: &BeaconState<E>, current_state: &BeaconState<E>,
) -> Option<BeaconState<E>> { ) -> Result<BeaconState<E>, Error> {
// For compatibility with the freezer database's restore points, we load a state at // For compatibility with the freezer database's restore points, we load a state at
// a restore point slot (thus avoiding replaying blocks). In the case where we're // a restore point slot (thus avoiding replaying blocks). In the case where we're
// not frozen, this just means we might not jump back by the maximum amount on // not frozen, this just means we might not jump back by the maximum amount on
// our first jump (i.e. at most 1 extra state load). // our first jump (i.e. at most 1 extra state load).
let new_state_slot = slot_of_prev_restore_point::<E>(current_state.slot); let new_state_slot = slot_of_prev_restore_point::<E>(current_state.slot);
let new_state_root = current_state.get_state_root(new_state_slot).ok()?; let new_state_root = current_state.get_state_root(new_state_slot)?;
store.get_state(new_state_root, Some(new_state_slot)).ok()? Ok(store
.get_state(new_state_root, Some(new_state_slot))?
.ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?)
} }
/// Compute the slot of the last guaranteed restore point in the freezer database. /// Compute the slot of the last guaranteed restore point in the freezer database.
@ -337,11 +352,12 @@ mod test {
let iter = BlockRootsIterator::new(store, &state_b); let iter = BlockRootsIterator::new(store, &state_b);
assert!( assert!(
iter.clone().any(|(_root, slot)| slot == 0), iter.clone()
.any(|result| result.map(|(_root, slot)| slot == 0).unwrap()),
"iter should contain zero slot" "iter should contain zero slot"
); );
let mut collected: Vec<(Hash256, Slot)> = iter.collect(); let mut collected: Vec<(Hash256, Slot)> = iter.collect::<Result<Vec<_>, _>>().unwrap();
collected.reverse(); collected.reverse();
let expected_len = 2 * MainnetEthSpec::slots_per_historical_root(); let expected_len = 2 * MainnetEthSpec::slots_per_historical_root();
@ -386,11 +402,12 @@ mod test {
let iter = StateRootsIterator::new(store, &state_b); let iter = StateRootsIterator::new(store, &state_b);
assert!( assert!(
iter.clone().any(|(_root, slot)| slot == 0), iter.clone()
.any(|result| result.map(|(_root, slot)| slot == 0).unwrap()),
"iter should contain zero slot" "iter should contain zero slot"
); );
let mut collected: Vec<(Hash256, Slot)> = iter.collect(); let mut collected: Vec<(Hash256, Slot)> = iter.collect::<Result<Vec<_>, _>>().unwrap();
collected.reverse(); collected.reverse();
let expected_len = MainnetEthSpec::slots_per_historical_root() * 2; let expected_len = MainnetEthSpec::slots_per_historical_root() * 2;

View File

@ -13,7 +13,7 @@ extern crate lazy_static;
pub mod chunked_iter; pub mod chunked_iter;
pub mod chunked_vector; pub mod chunked_vector;
pub mod config; pub mod config;
mod errors; pub mod errors;
mod forwards_iter; mod forwards_iter;
pub mod hot_cold_store; pub mod hot_cold_store;
mod impls; mod impls;
@ -99,7 +99,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
/// columns. A simple column implementation might involve prefixing a key with some bytes unique to /// columns. A simple column implementation might involve prefixing a key with some bytes unique to
/// each column. /// each column.
pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static { pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
type ForwardsBlockRootsIterator: Iterator<Item = (Hash256, Slot)>; type ForwardsBlockRootsIterator: Iterator<Item = Result<(Hash256, Slot), Error>>;
/// Store a block in the store. /// Store a block in the store.
fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock<E>) -> Result<(), Error>; fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock<E>) -> Result<(), Error>;
@ -146,7 +146,7 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
end_state: BeaconState<E>, end_state: BeaconState<E>,
end_block_root: Hash256, end_block_root: Hash256,
spec: &ChainSpec, spec: &ChainSpec,
) -> Self::ForwardsBlockRootsIterator; ) -> Result<Self::ForwardsBlockRootsIterator, Error>;
fn load_epoch_boundary_state( fn load_epoch_boundary_state(
&self, &self,

View File

@ -149,7 +149,7 @@ impl<E: EthSpec> Store<E> for MemoryStore<E> {
end_state: BeaconState<E>, end_state: BeaconState<E>,
end_block_root: Hash256, end_block_root: Hash256,
_: &ChainSpec, _: &ChainSpec,
) -> Self::ForwardsBlockRootsIterator { ) -> Result<Self::ForwardsBlockRootsIterator, Error> {
SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root) SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root)
} }