Advance state to next slot after importing block (#2174)

## Issue Addressed

NA

## Proposed Changes

Add an optimization to perform `per_slot_processing` from the *leading-edge* of block processing to the *trailing-edge*. Ultimately, this allows us to import the block at slot `n` faster because we used the tail-end of slot `n - 1` to perform `per_slot_processing`.

Additionally, add a "block proposer cache" which allows us to cache the block proposer for some epoch. Since we're now doing trailing-edge `per_slot_processing`, we can prime this cache with the values for the next epoch before those blocks arrive (assuming those blocks don't have some weird forking).

There were several ancillary changes required to achieve this: 

- Remove the `state_root` field  of `BeaconSnapshot`, since there's no need to know it on a `pre_state` and in all other cases we can just read it from `block.state_root()`.
    - This caused some "dust" changes of `snapshot.beacon_state_root` to `snapshot.beacon_state_root()`, where the `BeaconSnapshot::beacon_state_root()` func just reads the state root from the block.
- Rename `types::ShuffingId` to `AttestationShufflingId`. I originally did this because I added a `ProposerShufflingId` struct which turned out to be not so useful. I thought this new name was more descriptive so I kept it.
- Address https://github.com/ethereum/eth2.0-specs/pull/2196
- Add a debug log when we get a block with an unknown parent. There was previously no logging around this case.
- Add a function to `BeaconState` to compute all proposers for an epoch without re-computing the active indices for each slot.

## Additional Info

- ~~Blocked on #2173~~
- ~~Blocked on #2179~~ That PR was wrapped into this PR.
- There's potentially some places where we could avoid computing the proposer indices in `per_block_processing` but I haven't done this here. These would be an optimization beyond the issue at hand (improving block propagation times) and I think this PR is already doing enough. We can come back for that later.

## TODO

- [x] Tidy, improve comments.
- [x] ~~Try avoid computing proposer index in `per_block_processing`?~~
This commit is contained in:
Paul Hauner 2021-02-15 07:17:52 +00:00
parent 3000f3e5da
commit 88cc222204
25 changed files with 1066 additions and 135 deletions

View File

@ -2,6 +2,7 @@ use crate::attestation_verification::{
Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation,
VerifiedUnaggregatedAttestation,
};
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::block_verification::{
check_block_is_finalized_descendant, check_block_relevancy, get_block_root,
signature_verify_chain_segment, BlockError, FullyVerifiedBlock, GossipVerifiedBlock,
@ -24,7 +25,7 @@ use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::snapshot_cache::SnapshotCache;
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::{
get_block_delay_ms, timestamp_now, ValidatorMonitor,
get_block_delay_ms, get_slot_delay_ms, timestamp_now, ValidatorMonitor,
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
@ -231,8 +232,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) head_tracker: Arc<HeadTracker>,
/// A cache dedicated to block processing.
pub(crate) snapshot_cache: TimeoutRwLock<SnapshotCache<T::EthSpec>>,
/// Caches the shuffling for a given epoch and state root.
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub(crate) shuffling_cache: TimeoutRwLock<ShufflingCache>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub(crate) beacon_proposer_cache: Mutex<BeaconProposerCache>,
/// Caches a map of `validator_index -> validator_pubkey`.
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache>,
/// A list of any hard-coded forks that have been disabled.
@ -453,9 +456,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let head = self.head()?;
let slot = head.beacon_state.slot;
let head_slot = head.beacon_state.slot;
let head_state_root = head.beacon_state_root();
let iter = StateRootsIterator::owned(self.store.clone(), head.beacon_state);
let iter = std::iter::once(Ok((head.beacon_state_root, slot)))
let iter = std::iter::once(Ok((head_state_root, head_slot)))
.chain(iter)
.map(|result| result.map_err(Into::into));
Ok(iter)
@ -599,7 +603,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(HeadInfo {
slot: head.beacon_block.slot(),
block_root: head.beacon_block_root,
state_root: head.beacon_state_root,
state_root: head.beacon_state_root(),
current_justified_checkpoint: head.beacon_state.current_justified_checkpoint,
finalized_checkpoint: head.beacon_state.finalized_checkpoint,
fork: head.beacon_state.fork,
@ -1549,7 +1553,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in &[RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = ShufflingId::new(block_root, &state, *relative_epoch)?;
let shuffling_id = AttestationShufflingId::new(block_root, &state, *relative_epoch)?;
let shuffling_is_cached = self
.shuffling_cache
@ -1727,19 +1731,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.ok_or(Error::SnapshotCacheLockTimeout)
.map(|mut snapshot_cache| {
snapshot_cache.insert(BeaconSnapshot {
beacon_state: state,
beacon_state_root: signed_block.state_root(),
beacon_block: signed_block,
beacon_block_root: block_root,
});
snapshot_cache.insert(
BeaconSnapshot {
beacon_state: state,
beacon_block: signed_block,
beacon_block_root: block_root,
},
None,
)
})
.unwrap_or_else(|| {
.unwrap_or_else(|e| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "snapshot_cache",
"Failed to insert snapshot";
"error" => ?e,
"task" => "process block"
);
});
@ -1747,7 +1754,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.head_tracker
.register_block(block_root, parent_root, slot);
// send an event to the `events` endpoint after fully processing the block
// Send an event to the `events` endpoint after fully processing the block.
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_block_subscribers() {
event_handler.register(EventKind::Block(SseBlock {
@ -2021,7 +2028,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
})
})
.and_then(|mut snapshot| {
@ -2096,7 +2102,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);
// These fields are used for server-sent events
let state_root = new_head.beacon_state_root;
let state_root = new_head.beacon_state_root();
let head_slot = new_head.beacon_state.slot;
let target_epoch_start_slot = new_head
.beacon_state
@ -2116,6 +2122,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::stop_timer(update_head_timer);
// Observe the delay between the start of the slot and when we set the block as head.
metrics::observe_duration(
&metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME,
get_slot_delay_ms(timestamp_now(), head_slot, &self.slot_clock),
);
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| {
@ -2458,7 +2470,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block: self.head()?.beacon_block,
beacon_block_root: self.head()?.beacon_block_root,
beacon_state: self.head()?.beacon_state,
beacon_state_root: self.head()?.beacon_state_root,
};
dump.push(last_slot.clone());
@ -2485,7 +2496,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
};
dump.push(slot.clone());

View File

@ -210,7 +210,7 @@ where
let anchor_state = &anchor.beacon_state;
let mut anchor_block_header = anchor_state.latest_block_header.clone();
if anchor_block_header.state_root == Hash256::zero() {
anchor_block_header.state_root = anchor.beacon_state_root;
anchor_block_header.state_root = anchor.beacon_state_root();
}
let anchor_root = anchor_block_header.canonical_root();
let anchor_epoch = anchor_state.current_epoch();

View File

@ -0,0 +1,113 @@
//! The `BeaconProposer` cache stores the proposer indices for some epoch.
//!
//! This cache is keyed by `(epoch, block_root)` where `block_root` is the block root at
//! `end_slot(epoch - 1)`. We make the assertion that the proposer shuffling is identical for all
//! blocks in `epoch` which share the common ancestor of `block_root`.
//!
//! The cache is a fairly unintelligent LRU cache that is not pruned after finality. This makes it
//! very simple to reason about, but it might store values that are useless due to finalization. The
//! values it stores are very small, so this should not be an issue.
use lru::LruCache;
use smallvec::SmallVec;
use types::{BeaconStateError, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned};
/// The number of sets of proposer indices that should be cached.
const CACHE_SIZE: usize = 16;
/// This value is fairly unimportant, it's used to avoid heap allocations. The result of it being
/// incorrect is non-substantial from a consensus perspective (and probably also from a
/// performance perspective).
const TYPICAL_SLOTS_PER_EPOCH: usize = 32;
/// For some given slot, this contains the proposer index (`index`) and the `fork` that should be
/// used to verify their signature.
pub struct Proposer {
pub index: usize,
pub fork: Fork,
}
/// The list of proposers for some given `epoch`, alongside the `fork` that should be used to verify
/// their signatures.
pub struct EpochBlockProposers {
/// The epoch to which the proposers pertain.
epoch: Epoch,
/// The fork that should be used to verify proposer signatures.
fork: Fork,
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
/// in that epoch.
///
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
}
/// A cache to store the proposers for some epoch.
///
/// See the module-level documentation for more information.
pub struct BeaconProposerCache {
cache: LruCache<(Epoch, Hash256), EpochBlockProposers>,
}
impl Default for BeaconProposerCache {
fn default() -> Self {
Self {
cache: LruCache::new(CACHE_SIZE),
}
}
}
impl BeaconProposerCache {
/// If it is cached, returns the proposer for the block at `slot` where the block has the
/// ancestor block root of `shuffling_decision_block` at `end_slot(slot.epoch() - 1)`.
pub fn get<T: EthSpec>(
&mut self,
shuffling_decision_block: Hash256,
slot: Slot,
) -> Option<Proposer> {
let epoch = slot.epoch(T::slots_per_epoch());
let key = (epoch, shuffling_decision_block);
if let Some(cache) = self.cache.get(&key) {
// This `if` statement is likely unnecessary, but it feels like good practice.
if epoch == cache.epoch {
cache
.proposers
.get(slot.as_usize() % T::SlotsPerEpoch::to_usize())
.map(|&index| Proposer {
index,
fork: cache.fork,
})
} else {
None
}
} else {
None
}
}
/// Insert the proposers into the cache.
///
/// See `Self::get` for a description of `shuffling_decision_block`.
///
/// The `fork` value must be valid to verify proposer signatures in `epoch`.
pub fn insert(
&mut self,
epoch: Epoch,
shuffling_decision_block: Hash256,
proposers: Vec<usize>,
fork: Fork,
) -> Result<(), BeaconStateError> {
let key = (epoch, shuffling_decision_block);
if !self.cache.contains(&key) {
self.cache.put(
key,
EpochBlockProposers {
epoch,
fork,
proposers: proposers.into(),
},
);
}
Ok(())
}
}

View File

@ -9,7 +9,6 @@ pub struct BeaconSnapshot<E: EthSpec> {
pub beacon_block: SignedBeaconBlock<E>,
pub beacon_block_root: Hash256,
pub beacon_state: BeaconState<E>,
pub beacon_state_root: Hash256,
}
impl<E: EthSpec> BeaconSnapshot<E> {
@ -18,28 +17,33 @@ impl<E: EthSpec> BeaconSnapshot<E> {
beacon_block: SignedBeaconBlock<E>,
beacon_block_root: Hash256,
beacon_state: BeaconState<E>,
beacon_state_root: Hash256,
) -> Self {
Self {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
}
}
/// Returns the state root from `self.beacon_block`.
///
/// ## Caution
///
/// It is not strictly enforced that `root(self.beacon_state) == self.beacon_state_root()`.
pub fn beacon_state_root(&self) -> Hash256 {
self.beacon_block.message.state_root
}
/// Update all fields of the checkpoint.
pub fn update(
&mut self,
beacon_block: SignedBeaconBlock<E>,
beacon_block_root: Hash256,
beacon_state: BeaconState<E>,
beacon_state_root: Hash256,
) {
self.beacon_block = beacon_block;
self.beacon_block_root = beacon_block_root;
self.beacon_state = beacon_state;
self.beacon_state_root = beacon_state_root;
}
pub fn clone_with(&self, clone_config: CloneConfig) -> Self {
@ -47,7 +51,6 @@ impl<E: EthSpec> BeaconSnapshot<E> {
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
beacon_state: self.beacon_state.clone_with(clone_config),
beacon_state_root: self.beacon_state_root,
}
}
}

View File

@ -40,17 +40,20 @@
//! END
//!
//! ```
use crate::snapshot_cache::PreProcessingSnapshot;
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
beacon_chain::{
BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
},
metrics, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot,
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use fork_choice::{ForkChoice, ForkChoiceStore};
use parking_lot::RwLockReadGuard;
use slog::{error, Logger};
use proto_array::Block as ProtoBlock;
use slog::{debug, error, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::{
@ -66,7 +69,7 @@ use std::io::Write;
use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp};
use tree_hash::TreeHash;
use types::{
BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256,
BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec, Hash256,
PublicKey, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
@ -179,7 +182,7 @@ pub enum BlockError<T: EthSpec> {
/// ## Peer scoring
///
/// The block is invalid and the peer is faulty.
BlockIsNotLaterThanParent { block_slot: Slot, state_slot: Slot },
BlockIsNotLaterThanParent { block_slot: Slot, parent_slot: Slot },
/// At least one block in the chain segment did not have it's parent root set to the root of
/// the prior block.
///
@ -348,11 +351,8 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
.map(|(_, block)| block.slot())
.unwrap_or_else(|| slot);
let state = cheap_state_advance_to_obtain_committees(
&mut parent.beacon_state,
highest_slot,
&chain.spec,
)?;
let state =
cheap_state_advance_to_obtain_committees(&mut parent.pre_state, highest_slot, &chain.spec)?;
let pubkey_cache = get_validator_pubkey_cache(chain)?;
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
@ -388,7 +388,7 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
pub struct GossipVerifiedBlock<T: BeaconChainTypes> {
pub block: SignedBeaconBlock<T::EthSpec>,
pub block_root: Hash256,
parent: BeaconSnapshot<T::EthSpec>,
parent: Option<PreProcessingSnapshot<T::EthSpec>>,
}
/// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit
@ -396,7 +396,7 @@ pub struct GossipVerifiedBlock<T: BeaconChainTypes> {
pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
block: SignedBeaconBlock<T::EthSpec>,
block_root: Hash256,
parent: Option<BeaconSnapshot<T::EthSpec>>,
parent: Option<PreProcessingSnapshot<T::EthSpec>>,
}
/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and
@ -520,16 +520,90 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
&chain.store,
)?;
let (mut parent, block) = load_parent(block, chain)?;
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let (parent_block, block) = verify_parent_block_is_known(chain, block)?;
// Track the number of skip slots between the block and its parent.
metrics::set_gauge(
&metrics::GOSSIP_BEACON_BLOCK_SKIPPED_SLOTS,
block
.slot()
.as_u64()
.saturating_sub(1)
.saturating_sub(parent_block.slot.into()) as i64,
);
// Paranoid check to prevent propagation of blocks that don't form a legitimate chain.
//
// This is not in the spec, but @protolambda tells me that the majority of other clients are
// already doing it. For reference:
//
// https://github.com/ethereum/eth2.0-specs/pull/2196
if parent_block.slot >= block.slot() {
return Err(BlockError::BlockIsNotLaterThanParent {
block_slot: block.slot(),
parent_slot: parent_block.slot,
});
}
let proposer_shuffling_decision_block =
if parent_block.slot.epoch(T::EthSpec::slots_per_epoch()) == block_epoch {
parent_block
.next_epoch_shuffling_id
.shuffling_decision_block
} else {
parent_block.root
};
// Reject any block that exceeds our limit on skipped slots.
check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?;
check_block_skip_slots(chain, parent_block.slot, &block.message)?;
let state = cheap_state_advance_to_obtain_committees(
&mut parent.beacon_state,
block.slot(),
&chain.spec,
)?;
// We assign to a variable instead of using `if let Some` directly to ensure we drop the
// write lock before trying to acquire it again in the `else` clause.
let proposer_opt = chain
.beacon_proposer_cache
.lock()
.get::<T::EthSpec>(proposer_shuffling_decision_block, block.slot());
let (expected_proposer, fork, parent, block) = if let Some(proposer) = proposer_opt {
// The proposer index was cached and we can return it without needing to load the
// parent.
(proposer.index, proposer.fork, None, block)
} else {
// The proposer index was *not* cached and we must load the parent in order to determine
// the proposer index.
let (mut parent, block) = load_parent(block, chain)?;
debug!(
chain.log,
"Proposer shuffling cache miss";
"parent_root" => ?parent.beacon_block_root,
"parent_slot" => parent.beacon_block.slot(),
"block_root" => ?block_root,
"block_slot" => block.slot(),
);
// The state produced is only valid for determining proposer/attester shuffling indices.
let state = cheap_state_advance_to_obtain_committees(
&mut parent.pre_state,
block.slot(),
&chain.spec,
)?;
let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
let proposer_index = *proposers
.get(block.slot().as_usize() % T::EthSpec::slots_per_epoch() as usize)
.ok_or_else(|| BeaconChainError::NoProposerForSlot(block.slot()))?;
// Prime the proposer shuffling cache with the newly-learned value.
chain.beacon_proposer_cache.lock().insert(
block_epoch,
proposer_shuffling_decision_block,
proposers,
state.fork,
)?;
(proposer_index, state.fork, Some(parent), block)
};
let signature_is_valid = {
let pubkey_cache = get_validator_pubkey_cache(chain)?;
@ -539,7 +613,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
block.verify_signature(
Some(block_root),
pubkey,
&state.fork,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
@ -566,12 +640,10 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
});
}
let expected_proposer =
state.get_beacon_proposer_index(block.message.slot, &chain.spec)? as u64;
if block.message.proposer_index != expected_proposer {
if block.message.proposer_index != expected_proposer as u64 {
return Err(BlockError::IncorrectBlockProposer {
block: block.message.proposer_index,
local_shuffling: expected_proposer,
local_shuffling: expected_proposer as u64,
});
}
@ -615,12 +687,12 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
let (mut parent, block) = load_parent(block, chain)?;
// Reject any block that exceeds our limit on skipped slots.
check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?;
check_block_skip_slots(chain, parent.beacon_block.slot(), &block.message)?;
let block_root = get_block_root(&block);
let state = cheap_state_advance_to_obtain_committees(
&mut parent.beacon_state,
&mut parent.pre_state,
block.slot(),
&chain.spec,
)?;
@ -657,11 +729,14 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
from: GossipVerifiedBlock<T>,
chain: &BeaconChain<T>,
) -> Result<Self, BlockError<T::EthSpec>> {
let mut parent = from.parent;
let block = from.block;
let (mut parent, block) = if let Some(parent) = from.parent {
(parent, from.block)
} else {
load_parent(from.block, chain)?
};
let state = cheap_state_advance_to_obtain_committees(
&mut parent.beacon_state,
&mut parent.pre_state,
block.slot(),
&chain.spec,
)?;
@ -749,7 +824,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
pub fn from_signature_verified_components(
block: SignedBeaconBlock<T::EthSpec>,
block_root: Hash256,
parent: BeaconSnapshot<T::EthSpec>,
parent: PreProcessingSnapshot<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, BlockError<T::EthSpec>> {
// Reject any block if its parent is not known to fork choice.
@ -771,7 +846,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
}
// Reject any block that exceeds our limit on skipped slots.
check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?;
check_block_skip_slots(chain, parent.beacon_block.slot(), &block.message)?;
/*
* Perform cursory checks to see if the block is even worth processing.
@ -790,20 +865,41 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
let mut confirmation_db_batch = vec![];
// The block must have a higher slot than its parent.
if block.slot() <= parent.beacon_state.slot {
if block.slot() <= parent.beacon_block.slot() {
return Err(BlockError::BlockIsNotLaterThanParent {
block_slot: block.slot(),
state_slot: parent.beacon_state.slot,
parent_slot: parent.beacon_block.slot(),
});
}
let mut summaries = vec![];
// Transition the parent state to the block slot.
let mut state = parent.beacon_state;
//
// It is important to note that we're using a "pre-state" here, one that has potentially
// been advanced one slot forward from `parent.beacon_block.slot`.
let mut state = parent.pre_state;
// Perform a sanity check on the pre-state.
let parent_slot = parent.beacon_block.slot();
if state.slot < parent_slot || state.slot > parent_slot + 1 {
return Err(BeaconChainError::BadPreState {
parent_root: parent.beacon_block_root,
parent_slot,
block_root,
block_slot: block.slot(),
state_slot: state.slot,
}
.into());
}
let distance = block.slot().as_u64().saturating_sub(state.slot.as_u64());
for i in 0..distance {
let state_root = if i == 0 {
for _ in 0..distance {
let state_root = if parent.beacon_block.slot() == state.slot {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
// This is a new state we've reached, so stage it for storage in the DB.
@ -851,6 +947,24 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
expose_participation_metrics(&summaries);
// If the block is sufficiently recent, notify the validator monitor.
if let Some(slot) = chain.slot_clock.now() {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
if block.slot().epoch(T::EthSpec::slots_per_epoch())
+ VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64
>= epoch
{
let validator_monitor = chain.validator_monitor.read();
// Update the summaries in a separate loop to `per_slot_processing`. This protects
// the `validator_monitor` lock from being bounced or held for a long time whilst
// performing `per_slot_processing`.
for (i, summary) in summaries.iter().enumerate() {
let epoch = state.current_epoch() - Epoch::from(summaries.len() - i);
validator_monitor.process_validator_statuses(epoch, &summary.statuses);
}
}
}
metrics::stop_timer(catchup_timer);
/*
@ -941,14 +1055,14 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
/// `import_max_skip_slots` value.
fn check_block_skip_slots<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
parent: &BeaconBlock<T::EthSpec>,
parent_slot: Slot,
block: &BeaconBlock<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
// Reject any block that exceeds our limit on skipped slots.
if let Some(max_skip_slots) = chain.config.import_max_skip_slots {
if block.slot > parent.slot + max_skip_slots {
if block.slot > parent_slot + max_skip_slots {
return Err(BlockError::TooManySkippedSlots {
parent_slot: parent.slot,
parent_slot,
block_slot: block.slot,
});
}
@ -1071,6 +1185,24 @@ pub fn get_block_root<E: EthSpec>(block: &SignedBeaconBlock<E>) -> Hash256 {
block_root
}
/// Verify the parent of `block` is known, returning some information about the parent block from
/// fork choice.
#[allow(clippy::type_complexity)]
fn verify_parent_block_is_known<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: SignedBeaconBlock<T::EthSpec>,
) -> Result<(ProtoBlock, SignedBeaconBlock<T::EthSpec>), BlockError<T::EthSpec>> {
if let Some(proto_block) = chain
.fork_choice
.read()
.get_block(&block.message.parent_root)
{
Ok((proto_block, block))
} else {
Err(BlockError::ParentUnknown(Box::new(block)))
}
}
/// Load the parent snapshot (block and state) of the given `block`.
///
/// Returns `Err(BlockError::ParentUnknown)` if the parent is not found, or if an error occurs
@ -1079,7 +1211,13 @@ pub fn get_block_root<E: EthSpec>(block: &SignedBeaconBlock<E>) -> Hash256 {
fn load_parent<T: BeaconChainTypes>(
block: SignedBeaconBlock<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(BeaconSnapshot<T::EthSpec>, SignedBeaconBlock<T::EthSpec>), BlockError<T::EthSpec>> {
) -> Result<
(
PreProcessingSnapshot<T::EthSpec>,
SignedBeaconBlock<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
// Reject any block if its parent is not known to fork choice.
//
// A block that is not in fork choice is either:
@ -1105,7 +1243,7 @@ fn load_parent<T: BeaconChainTypes>(
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|mut snapshot_cache| snapshot_cache.try_remove(block.parent_root()))
{
Ok((snapshot, block))
Ok((snapshot.into_pre_state(), block))
} else {
// Load the blocks parent block from the database, returning invalid if that block is not
// found.
@ -1136,11 +1274,10 @@ fn load_parent<T: BeaconChainTypes>(
})?;
Ok((
BeaconSnapshot {
PreProcessingSnapshot {
beacon_block: parent_block,
beacon_block_root: root,
beacon_state: parent_state,
beacon_state_root: parent_state_root,
pre_state: parent_state,
},
block,
))
@ -1151,12 +1288,12 @@ fn load_parent<T: BeaconChainTypes>(
result
}
/// Performs a cheap (time-efficient) state advancement so the committees for `slot` can be
/// obtained from `state`.
/// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for
/// `slot` can be obtained from `state`.
///
/// The state advancement is "cheap" since it does not generate state roots. As a result, the
/// returned state might be holistically invalid but the committees will be correct (since they do
/// not rely upon state roots).
/// returned state might be holistically invalid but the committees/proposers will be correct (since
/// they do not rely upon state roots).
///
/// If the given `state` can already serve the `slot`, the committees will be built on the `state`
/// and `Cow::Borrowed(state)` will be returned. Otherwise, the state will be cloned, cheaply
@ -1176,7 +1313,7 @@ fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec>(
} else if state.slot > block_slot {
Err(BlockError::BlockIsNotLaterThanParent {
block_slot,
state_slot: state.slot,
parent_slot: state.slot,
})
} else {
let mut state = state.clone_with(CloneConfig::committee_caches_only());

View File

@ -328,7 +328,6 @@ where
let genesis = BeaconSnapshot {
beacon_block_root,
beacon_block,
beacon_state_root,
beacon_state,
};
@ -468,14 +467,9 @@ where
let mut canonical_head = BeaconSnapshot {
beacon_block_root: head_block_root,
beacon_block: head_block,
beacon_state_root: head_state_root,
beacon_state: head_state,
};
if canonical_head.beacon_block.state_root() != canonical_head.beacon_state_root {
return Err("beacon_block.state_root != beacon_state".to_string());
}
canonical_head
.beacon_state
.build_all_caches(&self.spec)
@ -560,6 +554,7 @@ where
canonical_head,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
beacon_proposer_cache: <_>::default(),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
disabled_forks: self.disabled_forks,
shutdown_sender: self
@ -599,7 +594,7 @@ where
info!(
log,
"Beacon chain initialized";
"head_state" => format!("{}", head.beacon_state_root),
"head_state" => format!("{}", head.beacon_state_root()),
"head_block" => format!("{}", head.beacon_block_root),
"head_slot" => format!("{}", head.beacon_block.slot()),
);

View File

@ -69,9 +69,11 @@ pub enum BeaconChainError {
/// Returned when an internal check fails, indicating corrupt data.
InvariantViolated(String),
SszTypesError(SszTypesError),
NoProposerForSlot(Slot),
CanonicalHeadLockTimeout,
AttestationCacheLockTimeout,
ValidatorPubkeyCacheLockTimeout,
SnapshotCacheLockTimeout,
IncorrectStateForAttestation(RelativeEpochError),
InvalidValidatorPubkeyBytes(bls::Error),
ValidatorPubkeyCacheIncomplete(usize),
@ -96,6 +98,13 @@ pub enum BeaconChainError {
head_slot: Slot,
request_slot: Slot,
},
BadPreState {
parent_root: Hash256,
parent_slot: Slot,
block_root: Hash256,
block_slot: Slot,
state_slot: Slot,
},
}
easy_from_to!(SlotProcessingError, BeaconChainError);

View File

@ -2,6 +2,7 @@
pub mod attestation_verification;
mod beacon_chain;
mod beacon_fork_choice_store;
mod beacon_proposer_cache;
mod beacon_snapshot;
mod block_verification;
pub mod builder;
@ -21,6 +22,7 @@ mod persisted_beacon_chain;
mod persisted_fork_choice;
mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
pub mod test_utils;
mod timeout_rw_lock;
pub mod validator_monitor;

View File

@ -424,6 +424,54 @@ lazy_static! {
/*
* Validator Monitor Metrics (per-epoch summaries)
*/
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT: Result<IntCounterVec> =
try_create_int_counter_vec(
"validator_monitor_prev_epoch_on_chain_attester_hit",
"Incremented if the validator is flagged as a previous epoch attester \
during per epoch processing",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS: Result<IntCounterVec> =
try_create_int_counter_vec(
"validator_monitor_prev_epoch_on_chain_attester_miss",
"Incremented if the validator is not flagged as a previous epoch attester \
during per epoch processing",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT: Result<IntCounterVec> =
try_create_int_counter_vec(
"validator_monitor_prev_epoch_on_chain_head_attester_hit",
"Incremented if the validator is flagged as a previous epoch head attester \
during per epoch processing",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS: Result<IntCounterVec> =
try_create_int_counter_vec(
"validator_monitor_prev_epoch_on_chain_head_attester_miss",
"Incremented if the validator is not flagged as a previous epoch head attester \
during per epoch processing",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT: Result<IntCounterVec> =
try_create_int_counter_vec(
"validator_monitor_prev_epoch_on_chain_target_attester_hit",
"Incremented if the validator is flagged as a previous epoch target attester \
during per epoch processing",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS: Result<IntCounterVec> =
try_create_int_counter_vec(
"validator_monitor_prev_epoch_on_chain_target_attester_miss",
"Incremented if the validator is not flagged as a previous epoch target attester \
during per epoch processing",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_INCLUSION_DISTANCE: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_on_chain_inclusion_distance",
"The attestation inclusion distance calculated during per epoch processing",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_TOTAL: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_attestations_total",
@ -575,15 +623,28 @@ lazy_static! {
*/
pub static ref BEACON_BLOCK_IMPORTED_SLOT_START_DELAY_TIME: Result<Histogram> = try_create_histogram(
"beacon_block_imported_slot_start_delay_time",
"Duration between the start of the blocks slot and the current time.",
"Duration between the start of the blocks slot and the current time when it was imported.",
);
pub static ref BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME: Result<Histogram> = try_create_histogram(
"beacon_block_head_slot_start_delay_time",
"Duration between the start of the blocks slot and the current time when it was as head.",
);
/*
* General block metrics
*/
pub static ref GOSSIP_BEACON_BLOCK_SKIPPED_SLOTS: Result<IntGauge> =
try_create_int_gauge(
"gossip_beacon_block_skipped_slots",
"For each gossip blocks, the number of skip slots between it and its parent"
);
}
/// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot,
/// head state info, etc) and update the Prometheus `DEFAULT_REGISTRY`.
pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
if let Ok(head) = beacon_chain.head() {
scrape_head_state::<T>(&head.beacon_state, head.beacon_state_root)
scrape_head_state::<T>(&head.beacon_state, head.beacon_state_root())
}
if let Some(slot) = beacon_chain.slot_clock.now() {

View File

@ -1,6 +1,6 @@
use crate::metrics;
use lru::LruCache;
use types::{beacon_state::CommitteeCache, Epoch, Hash256, ShufflingId};
use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256};
/// The size of the LRU cache that stores committee caches for quicker verification.
///
@ -14,7 +14,7 @@ const CACHE_SIZE: usize = 16;
/// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like
/// a find/replace error.
pub struct ShufflingCache {
cache: LruCache<ShufflingId, CommitteeCache>,
cache: LruCache<AttestationShufflingId, CommitteeCache>,
}
impl ShufflingCache {
@ -24,7 +24,7 @@ impl ShufflingCache {
}
}
pub fn get(&mut self, key: &ShufflingId) -> Option<&CommitteeCache> {
pub fn get(&mut self, key: &AttestationShufflingId) -> Option<&CommitteeCache> {
let opt = self.cache.get(key);
if opt.is_some() {
@ -36,11 +36,11 @@ impl ShufflingCache {
opt
}
pub fn contains(&self, key: &ShufflingId) -> bool {
pub fn contains(&self, key: &AttestationShufflingId) -> bool {
self.cache.contains(key)
}
pub fn insert(&mut self, key: ShufflingId, committee_cache: &CommitteeCache) {
pub fn insert(&mut self, key: AttestationShufflingId, committee_cache: &CommitteeCache) {
if !self.cache.contains(&key) {
self.cache.put(key, committee_cache.clone());
}
@ -49,8 +49,8 @@ impl ShufflingCache {
/// Contains the shuffling IDs for a beacon block.
pub struct BlockShufflingIds {
pub current: ShufflingId,
pub next: ShufflingId,
pub current: AttestationShufflingId,
pub next: AttestationShufflingId,
pub block_root: Hash256,
}
@ -58,13 +58,16 @@ impl BlockShufflingIds {
/// Returns the shuffling ID for the given epoch.
///
/// Returns `None` if `epoch` is prior to `self.current.shuffling_epoch`.
pub fn id_for_epoch(&self, epoch: Epoch) -> Option<ShufflingId> {
pub fn id_for_epoch(&self, epoch: Epoch) -> Option<AttestationShufflingId> {
if epoch == self.current.shuffling_epoch {
Some(self.current.clone())
} else if epoch == self.next.shuffling_epoch {
Some(self.next.clone())
} else if epoch > self.next.shuffling_epoch {
Some(ShufflingId::from_components(epoch, self.block_root))
Some(AttestationShufflingId::from_components(
epoch,
self.block_root,
))
} else {
None
}

View File

@ -1,10 +1,93 @@
use crate::BeaconSnapshot;
use std::cmp;
use types::{beacon_state::CloneConfig, Epoch, EthSpec, Hash256};
use types::{
beacon_state::CloneConfig, BeaconState, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};
/// The default size of the cache.
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
/// This snapshot is to be used for verifying a child of `self.beacon_block`.
pub struct PreProcessingSnapshot<T: EthSpec> {
/// This state is equivalent to the `self.beacon_block.state_root()` state that has been
/// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for
/// the application of another block.
pub pre_state: BeaconState<T>,
pub beacon_block: SignedBeaconBlock<T>,
pub beacon_block_root: Hash256,
}
impl<T: EthSpec> From<BeaconSnapshot<T>> for PreProcessingSnapshot<T> {
fn from(snapshot: BeaconSnapshot<T>) -> Self {
Self {
pre_state: snapshot.beacon_state,
beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root,
}
}
}
impl<T: EthSpec> CacheItem<T> {
pub fn new_without_pre_state(snapshot: BeaconSnapshot<T>) -> Self {
Self {
beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root,
beacon_state: snapshot.beacon_state,
pre_state: None,
}
}
fn clone_to_snapshot_with(&self, clone_config: CloneConfig) -> BeaconSnapshot<T> {
BeaconSnapshot {
beacon_state: self.beacon_state.clone_with(clone_config),
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
}
}
pub fn into_pre_state(self) -> PreProcessingSnapshot<T> {
PreProcessingSnapshot {
beacon_block: self.beacon_block,
beacon_block_root: self.beacon_block_root,
pre_state: self.pre_state.unwrap_or(self.beacon_state),
}
}
}
impl<T: EthSpec> Into<BeaconSnapshot<T>> for CacheItem<T> {
fn into(self) -> BeaconSnapshot<T> {
BeaconSnapshot {
beacon_state: self.beacon_state,
beacon_block: self.beacon_block,
beacon_block_root: self.beacon_block_root,
}
}
}
pub enum StateAdvance<T: EthSpec> {
/// The cache does not contain the supplied block root.
BlockNotFound,
/// The cache contains the supplied block root but the state has already been advanced.
AlreadyAdvanced,
/// The cache contains the supplied block root and the state has not yet been advanced.
State {
state: Box<BeaconState<T>>,
state_root: Hash256,
block_slot: Slot,
},
}
/// The item stored in the `SnapshotCache`.
pub struct CacheItem<T: EthSpec> {
beacon_block: SignedBeaconBlock<T>,
beacon_block_root: Hash256,
/// This state is equivalent to `self.beacon_block.state_root()`.
beacon_state: BeaconState<T>,
/// This state is equivalent to `self.beacon_state` that has had `per_slot_processing` applied
/// to it. This state assists in optimizing block processing.
pre_state: Option<BeaconState<T>>,
}
/// Provides a cache of `BeaconSnapshot` that is intended primarily for block processing.
///
/// ## Cache Queuing
@ -20,7 +103,7 @@ pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
pub struct SnapshotCache<T: EthSpec> {
max_len: usize,
head_block_root: Hash256,
snapshots: Vec<BeaconSnapshot<T>>,
snapshots: Vec<CacheItem<T>>,
}
impl<T: EthSpec> SnapshotCache<T> {
@ -31,15 +114,22 @@ impl<T: EthSpec> SnapshotCache<T> {
Self {
max_len: cmp::max(max_len, 1),
head_block_root: head.beacon_block_root,
snapshots: vec![head],
snapshots: vec![CacheItem::new_without_pre_state(head)],
}
}
/// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see
/// struct-level documentation for more info).
pub fn insert(&mut self, snapshot: BeaconSnapshot<T>) {
pub fn insert(&mut self, snapshot: BeaconSnapshot<T>, pre_state: Option<BeaconState<T>>) {
let item = CacheItem {
beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root,
beacon_state: snapshot.beacon_state,
pre_state,
};
if self.snapshots.len() < self.max_len {
self.snapshots.push(snapshot);
self.snapshots.push(item);
} else {
let insert_at = self
.snapshots
@ -56,13 +146,13 @@ impl<T: EthSpec> SnapshotCache<T> {
.map(|(i, _slot)| i);
if let Some(i) = insert_at {
self.snapshots[i] = snapshot;
self.snapshots[i] = item;
}
}
}
/// If there is a snapshot with `block_root`, remove and return it.
pub fn try_remove(&mut self, block_root: Hash256) -> Option<BeaconSnapshot<T>> {
pub fn try_remove(&mut self, block_root: Hash256) -> Option<CacheItem<T>> {
self.snapshots
.iter()
.position(|snapshot| snapshot.beacon_block_root == block_root)
@ -78,7 +168,40 @@ impl<T: EthSpec> SnapshotCache<T> {
self.snapshots
.iter()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| snapshot.clone_with(clone_config))
.map(|snapshot| snapshot.clone_to_snapshot_with(clone_config))
}
pub fn get_for_state_advance(&mut self, block_root: Hash256) -> StateAdvance<T> {
if let Some(snapshot) = self
.snapshots
.iter_mut()
.find(|snapshot| snapshot.beacon_block_root == block_root)
{
if snapshot.pre_state.is_some() {
StateAdvance::AlreadyAdvanced
} else {
let cloned = snapshot
.beacon_state
.clone_with(CloneConfig::committee_caches_only());
StateAdvance::State {
state: Box::new(std::mem::replace(&mut snapshot.beacon_state, cloned)),
state_root: snapshot.beacon_block.state_root(),
block_slot: snapshot.beacon_block.slot(),
}
}
} else {
StateAdvance::BlockNotFound
}
}
pub fn update_pre_state(&mut self, block_root: Hash256, state: BeaconState<T>) -> Option<()> {
self.snapshots
.iter_mut()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| {
snapshot.pre_state = Some(state);
})
}
/// Removes all snapshots from the queue that are less than or equal to the finalized epoch.
@ -115,7 +238,6 @@ mod test {
BeaconSnapshot {
beacon_state,
beacon_state_root: Hash256::from_low_u64_be(i),
beacon_block: SignedBeaconBlock {
message: BeaconBlock::empty(&spec),
signature: generate_deterministic_keypair(0)
@ -143,7 +265,7 @@ mod test {
// Each snapshot should be one slot into an epoch, with each snapshot one epoch apart.
snapshot.beacon_state.slot = Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1);
cache.insert(snapshot);
cache.insert(snapshot, None);
assert_eq!(
cache.snapshots.len(),
@ -161,7 +283,7 @@ mod test {
// 2 2
// 3 3
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
cache.insert(get_snapshot(42));
cache.insert(get_snapshot(42), None);
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
assert!(
@ -208,7 +330,7 @@ mod test {
// Over-fill the cache so it needs to eject some old values on insert.
for i in 0..CACHE_SIZE as u64 {
cache.insert(get_snapshot(u64::max_value() - i));
cache.insert(get_snapshot(u64::max_value() - i), None);
}
// Ensure that the new head value was not removed from the cache.

View File

@ -0,0 +1,327 @@
//! Provides a timer which runs in the tail-end of each slot and maybe advances the state of the
//! head block forward a single slot.
//!
//! This provides an optimization with the following benefits:
//!
//! 1. Removes the burden of a single, mandatory `per_slot_processing` call from the leading-edge of
//! block processing. This helps import blocks faster.
//! 2. Allows the node to learn of the shuffling for the next epoch, before the first block from
//! that epoch has arrived. This helps reduce gossip block propagation times.
//!
//! The downsides to this optimization are:
//!
//! 1. We are required to store an additional `BeaconState` for the head block. This consumes
//! memory.
//! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles.
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::{
beacon_chain::BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, snapshot_cache::StateAdvance, BeaconChain,
BeaconChainError, BeaconChainTypes,
};
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use state_processing::per_slot_processing;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use task_executor::TaskExecutor;
use tokio::time::sleep;
use types::{EthSpec, Hash256, Slot};
/// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform
/// the state advancement.
///
/// This avoids doing unnecessary work whilst the node is syncing or has perhaps been put to sleep
/// for some period of time.
const MAX_ADVANCE_DISTANCE: u64 = 4;
#[derive(Debug)]
enum Error {
BeaconChain(BeaconChainError),
HeadMissingFromSnapshotCache(Hash256),
MaxDistanceExceeded { current_slot: Slot, head_slot: Slot },
StateAlreadyAdvanced { block_root: Hash256 },
BadStateSlot { state_slot: Slot, block_slot: Slot },
}
impl From<BeaconChainError> for Error {
fn from(e: BeaconChainError) -> Self {
Self::BeaconChain(e)
}
}
/// Provides a simple thread-safe lock to be used for task co-ordination. Practically equivalent to
/// `Mutex<()>`.
#[derive(Clone)]
struct Lock(Arc<AtomicBool>);
impl Lock {
/// Instantiate an unlocked self.
pub fn new() -> Self {
Self(Arc::new(AtomicBool::new(false)))
}
/// Lock self, returning `true` if the lock was already set.
pub fn lock(&self) -> bool {
self.0.fetch_or(true, Ordering::SeqCst)
}
/// Unlock self.
pub fn unlock(&self) {
self.0.store(false, Ordering::SeqCst);
}
}
/// Spawns the timer described in the module-level documentation.
pub fn spawn_state_advance_timer<T: BeaconChainTypes>(
executor: TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>,
log: Logger,
) {
executor.spawn(
state_advance_timer(executor.clone(), beacon_chain, log),
"state_advance_timer",
);
}
/// Provides the timer described in the module-level documentation.
async fn state_advance_timer<T: BeaconChainTypes>(
executor: TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>,
log: Logger,
) {
let is_running = Lock::new();
let slot_clock = &beacon_chain.slot_clock;
let slot_duration = slot_clock.slot_duration();
loop {
match beacon_chain.slot_clock.duration_to_next_slot() {
Some(duration) => sleep(duration + (slot_duration / 4) * 3).await,
None => {
error!(log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
continue;
}
};
// Only start spawn the state advance task if the lock was previously free.
if !is_running.lock() {
let log = log.clone();
let beacon_chain = beacon_chain.clone();
let is_running = is_running.clone();
executor.spawn_blocking(
move || {
match advance_head(&beacon_chain, &log) {
Ok(()) => (),
Err(Error::BeaconChain(e)) => error!(
log,
"Failed to advance head state";
"error" => ?e
),
Err(Error::StateAlreadyAdvanced { block_root }) => debug!(
log,
"State already advanced on slot";
"block_root" => ?block_root
),
Err(Error::MaxDistanceExceeded {
current_slot,
head_slot,
}) => debug!(
log,
"Refused to advance head state";
"head_slot" => head_slot,
"current_slot" => current_slot,
),
other => warn!(
log,
"Did not advance head state";
"reason" => ?other
),
};
// Permit this blocking task to spawn again, next time the timer fires.
is_running.unlock();
},
"state_advance_blocking",
);
} else {
warn!(
log,
"State advance routine overloaded";
"msg" => "system resources may be overloaded"
)
}
}
}
/// Reads the `snapshot_cache` from the `beacon_chain` and attempts to take a clone of the
/// `BeaconState` of the head block. If it obtains this clone, the state will be advanced a single
/// slot then placed back in the `snapshot_cache` to be used for block verification.
///
/// See the module-level documentation for rationale.
fn advance_head<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
log: &Logger,
) -> Result<(), Error> {
let current_slot = beacon_chain.slot()?;
// These brackets ensure that the `head_slot` value is dropped before we run fork choice and
// potentially invalidate it.
//
// Fork-choice is not run *before* this function to avoid unnecessary calls whilst syncing.
{
let head_slot = beacon_chain.head_info()?.slot;
// Don't run this when syncing or if lagging too far behind.
if head_slot + MAX_ADVANCE_DISTANCE < current_slot {
return Err(Error::MaxDistanceExceeded {
current_slot,
head_slot,
});
}
}
// Run fork choice so we get the latest view of the head.
//
// This is useful since it's quite likely that the last time we ran fork choice was shortly
// after receiving the latest gossip block, but not necessarily after we've received the
// majority of attestations.
beacon_chain.fork_choice()?;
let head_root = beacon_chain.head_info()?.block_root;
let (head_slot, head_state_root, mut state) = match beacon_chain
.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::SnapshotCacheLockTimeout)?
.get_for_state_advance(head_root)
{
StateAdvance::AlreadyAdvanced => {
return Err(Error::StateAlreadyAdvanced {
block_root: head_root,
})
}
StateAdvance::BlockNotFound => return Err(Error::HeadMissingFromSnapshotCache(head_root)),
StateAdvance::State {
state,
state_root,
block_slot,
} => (block_slot, state_root, *state),
};
let initial_slot = state.slot;
let initial_epoch = state.current_epoch();
let state_root = if state.slot == head_slot {
Some(head_state_root)
} else {
// Protect against advancing a state more than a single slot.
//
// Advancing more than one slot without storing the intermediate state would corrupt the
// database. Future works might store temporary, intermediate states inside this function.
return Err(Error::BadStateSlot {
block_slot: head_slot,
state_slot: state.slot,
});
};
// Advance the state a single slot.
if let Some(summary) = per_slot_processing(&mut state, state_root, &beacon_chain.spec)
.map_err(BeaconChainError::from)?
{
// Only notify the validator monitor for recent blocks.
if state.current_epoch() + VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64
>= current_slot.epoch(T::EthSpec::slots_per_epoch())
{
// Potentially create logs/metrics for locally monitored validators.
beacon_chain
.validator_monitor
.read()
.process_validator_statuses(state.current_epoch(), &summary.statuses);
}
}
debug!(
log,
"Advanced head state one slot";
"head_root" => ?head_root,
"state_slot" => state.slot,
"current_slot" => current_slot,
);
// If the `pre_state` is in a later epoch than `state`, pre-emptively add the proposer
// shuffling for the next epoch into the cache.
if initial_epoch > state.current_epoch() {
debug!(
log,
"Priming proposer cache";
"head_root" => ?head_root,
"state_epoch" => state.current_epoch(),
"current_epoch" => current_slot.epoch(T::EthSpec::slots_per_epoch()),
);
beacon_chain
.beacon_proposer_cache
.lock()
.insert(
state.current_epoch(),
head_root,
state
.get_beacon_proposer_indices(&beacon_chain.spec)
.map_err(BeaconChainError::from)?,
state.fork,
)
.map_err(BeaconChainError::from)?;
}
let final_slot = state.slot;
// Insert the advanced state back into the snapshot cache.
beacon_chain
.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::SnapshotCacheLockTimeout)?
.update_pre_state(head_root, state)
.ok_or(Error::HeadMissingFromSnapshotCache(head_root))?;
let current_slot = beacon_chain.slot()?;
if final_slot <= current_slot {
warn!(
log,
"State advance too slow";
"head_root" => %head_root,
"advanced_slot" => final_slot,
"current_slot" => current_slot,
"initial_slot" => initial_slot,
"msg" => "system resources may be overloaded",
);
}
debug!(
log,
"Completed state advance";
"head_root" => ?head_root,
"advanced_slot" => final_slot,
"initial_slot" => initial_slot,
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lock() {
let lock = Lock::new();
assert_eq!(lock.lock(), false);
assert_eq!(lock.lock(), true);
assert_eq!(lock.lock(), true);
lock.unlock();
assert_eq!(lock.lock(), false);
assert_eq!(lock.lock(), true);
}
}

View File

@ -4,8 +4,9 @@
use crate::metrics;
use parking_lot::RwLock;
use slog::{crit, info, Logger};
use slog::{crit, error, info, warn, Logger};
use slot_clock::SlotClock;
use state_processing::per_epoch_processing::ValidatorStatus;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::io;
@ -325,6 +326,103 @@ impl<T: EthSpec> ValidatorMonitor<T> {
}
}
pub fn process_validator_statuses(&self, epoch: Epoch, summaries: &[ValidatorStatus]) {
for monitored_validator in self.validators.values() {
// We subtract two from the state of the epoch that generated these summaries.
//
// - One to account for it being the previous epoch.
// - One to account for the state advancing an epoch whilst generating the validator
// statuses.
let prev_epoch = epoch - 2;
if let Some(i) = monitored_validator.index {
let i = i as usize;
let id = &monitored_validator.id;
if let Some(summary) = summaries.get(i) {
if summary.is_previous_epoch_attester {
let lag = summary
.inclusion_info
.map(|i| format!("{} slot(s)", i.delay.saturating_sub(1).to_string()))
.unwrap_or_else(|| "??".to_string());
info!(
self.log,
"Previous epoch attestation success";
"inclusion_lag" => lag,
"matched_target" => summary.is_previous_epoch_target_attester,
"matched_head" => summary.is_previous_epoch_head_attester,
"epoch" => prev_epoch,
"validator" => id,
)
} else if summary.is_active_in_previous_epoch
&& !summary.is_previous_epoch_attester
{
error!(
self.log,
"Previous epoch attestation missing";
"epoch" => prev_epoch,
"validator" => id,
)
}
if summary.is_previous_epoch_attester {
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT,
&[id],
);
} else {
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS,
&[id],
);
}
if summary.is_previous_epoch_head_attester {
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT,
&[id],
);
} else {
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS,
&[id],
);
warn!(
self.log,
"Attested to an incorrect head";
"epoch" => prev_epoch,
"validator" => id,
);
}
if summary.is_previous_epoch_target_attester {
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT,
&[id],
);
} else {
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS,
&[id],
);
warn!(
self.log,
"Attested to an incorrect target";
"epoch" => prev_epoch,
"validator" => id,
);
}
if let Some(inclusion_info) = summary.inclusion_info {
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_INCLUSION_DISTANCE,
&[id],
inclusion_info.delay as i64,
);
}
}
}
}
}
fn get_validator_id(&self, validator_index: u64) -> Option<&str> {
self.indices
.get(&validator_index)
@ -945,9 +1043,18 @@ pub fn get_block_delay_ms<T: EthSpec, S: SlotClock>(
seen_timestamp: Duration,
block: &BeaconBlock<T>,
slot_clock: &S,
) -> Duration {
get_slot_delay_ms::<S>(seen_timestamp, block.slot, slot_clock)
}
/// Returns the delay between the start of `slot` and `seen_timestamp`.
pub fn get_slot_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
slot: Slot,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(block.slot)
.start_of(slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.unwrap_or_else(|| Duration::from_secs(0))
}

View File

@ -1706,7 +1706,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
for checkpoint in &chain_dump {
// Check that the tree hash of the stored state is as expected
assert_eq!(
checkpoint.beacon_state_root,
checkpoint.beacon_state_root(),
checkpoint.beacon_state.tree_hash_root(),
"tree hash of stored state is incorrect"
);
@ -1717,7 +1717,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
harness
.chain
.store
.get_state(&checkpoint.beacon_state_root, None)
.get_state(&checkpoint.beacon_state_root(), None)
.expect("no error")
.expect("state exists")
.slot,

View File

@ -133,7 +133,7 @@ fn iterators() {
assert_eq!(
*state_roots.first().expect("should have some state roots"),
(head.beacon_state_root, head.beacon_state.slot),
(head.beacon_state_root(), head.beacon_state.slot),
"first state root and slot should be for the head state"
);
}

View File

@ -5,6 +5,7 @@ use beacon_chain::{
builder::{BeaconChainBuilder, Witness},
eth1_chain::{CachingEth1Backend, Eth1Chain},
slot_clock::{SlotClock, SystemTimeSlotClock},
state_advance_timer::spawn_state_advance_timer,
store::{HotColdDB, ItemStore, LevelDB, StoreConfig},
BeaconChain, BeaconChainTypes, Eth1ChainBackend, ServerSentEventHandler,
};
@ -481,6 +482,12 @@ where
self.start_slasher_service()?;
}
if let Some(beacon_chain) = self.beacon_chain.as_ref() {
let state_advance_context = runtime_context.service_context("state_advance".into());
let log = state_advance_context.log().clone();
spawn_state_advance_timer(state_advance_context.executor, beacon_chain.clone(), log);
}
Ok(Client {
beacon_chain: self.beacon_chain,
network_globals: self.network_globals,

View File

@ -269,6 +269,11 @@ impl<T: BeaconChainTypes> Worker<T> {
verified_block
}
Err(BlockError::ParentUnknown(block)) => {
debug!(
self.log,
"Unknown parent for gossip block";
"root" => %block.canonical_root()
);
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
return;
}

View File

@ -3,8 +3,8 @@ use std::marker::PhantomData;
use proto_array::{Block as ProtoBlock, ProtoArrayForkChoice};
use ssz_derive::{Decode, Encode};
use types::{
BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256,
IndexedAttestation, RelativeEpoch, ShufflingId, Slot,
AttestationShufflingId, BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec,
Hash256, IndexedAttestation, RelativeEpoch, Slot,
};
use crate::ForkChoiceStore;
@ -247,10 +247,10 @@ where
let finalized_block_slot = genesis_block.slot;
let finalized_block_state_root = genesis_block.state_root;
let current_epoch_shuffling_id =
ShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Current)
AttestationShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Current)
.map_err(Error::BeaconStateError)?;
let next_epoch_shuffling_id =
ShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Next)
AttestationShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Next)
.map_err(Error::BeaconStateError)?;
let proto_array = ProtoArrayForkChoice::new(
@ -543,10 +543,18 @@ where
root: block_root,
parent_root: Some(block.parent_root),
target_root,
current_epoch_shuffling_id: ShufflingId::new(block_root, state, RelativeEpoch::Current)
.map_err(Error::BeaconStateError)?,
next_epoch_shuffling_id: ShufflingId::new(block_root, state, RelativeEpoch::Next)
.map_err(Error::BeaconStateError)?,
current_epoch_shuffling_id: AttestationShufflingId::new(
block_root,
state,
RelativeEpoch::Current,
)
.map_err(Error::BeaconStateError)?,
next_epoch_shuffling_id: AttestationShufflingId::new(
block_root,
state,
RelativeEpoch::Next,
)
.map_err(Error::BeaconStateError)?,
state_root: block.state_root,
justified_epoch: state.current_justified_checkpoint.epoch,
finalized_epoch: state.finalized_checkpoint.epoch,

View File

@ -4,7 +4,7 @@ mod votes;
use crate::proto_array_fork_choice::{Block, ProtoArrayForkChoice};
use serde_derive::{Deserialize, Serialize};
use types::{Epoch, Hash256, ShufflingId, Slot};
use types::{AttestationShufflingId, Epoch, Hash256, Slot};
pub use ffg_updates::*;
pub use no_votes::*;
@ -55,7 +55,8 @@ pub struct ForkChoiceTestDefinition {
impl ForkChoiceTestDefinition {
pub fn run(self) {
let junk_shuffling_id = ShufflingId::from_components(Epoch::new(0), Hash256::zero());
let junk_shuffling_id =
AttestationShufflingId::from_components(Epoch::new(0), Hash256::zero());
let mut fork_choice = ProtoArrayForkChoice::new(
self.finalized_block_slot,
Hash256::zero(),
@ -128,11 +129,11 @@ impl ForkChoiceTestDefinition {
parent_root: Some(parent_root),
state_root: Hash256::zero(),
target_root: Hash256::zero(),
current_epoch_shuffling_id: ShufflingId::from_components(
current_epoch_shuffling_id: AttestationShufflingId::from_components(
Epoch::new(0),
Hash256::zero(),
),
next_epoch_shuffling_id: ShufflingId::from_components(
next_epoch_shuffling_id: AttestationShufflingId::from_components(
Epoch::new(0),
Hash256::zero(),
),

View File

@ -2,7 +2,7 @@ use crate::{error::Error, Block};
use serde_derive::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use std::collections::HashMap;
use types::{Epoch, Hash256, ShufflingId, Slot};
use types::{AttestationShufflingId, Epoch, Hash256, Slot};
#[derive(Clone, PartialEq, Debug, Encode, Decode, Serialize, Deserialize)]
pub struct ProtoNode {
@ -18,8 +18,8 @@ pub struct ProtoNode {
/// The `target_root` is not necessary for `ProtoArray` either, it also just exists for upstream
/// components (namely fork choice attestation verification).
pub target_root: Hash256,
pub current_epoch_shuffling_id: ShufflingId,
pub next_epoch_shuffling_id: ShufflingId,
pub current_epoch_shuffling_id: AttestationShufflingId,
pub next_epoch_shuffling_id: AttestationShufflingId,
pub root: Hash256,
pub parent: Option<usize>,
pub justified_epoch: Epoch,

View File

@ -4,7 +4,7 @@ use crate::ssz_container::SszContainer;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::collections::HashMap;
use types::{Epoch, Hash256, ShufflingId, Slot};
use types::{AttestationShufflingId, Epoch, Hash256, Slot};
pub const DEFAULT_PRUNE_THRESHOLD: usize = 256;
@ -25,8 +25,8 @@ pub struct Block {
pub parent_root: Option<Hash256>,
pub state_root: Hash256,
pub target_root: Hash256,
pub current_epoch_shuffling_id: ShufflingId,
pub next_epoch_shuffling_id: ShufflingId,
pub current_epoch_shuffling_id: AttestationShufflingId,
pub next_epoch_shuffling_id: AttestationShufflingId,
pub justified_epoch: Epoch,
pub finalized_epoch: Epoch,
}
@ -72,8 +72,8 @@ impl ProtoArrayForkChoice {
justified_epoch: Epoch,
finalized_epoch: Epoch,
finalized_root: Hash256,
current_epoch_shuffling_id: ShufflingId,
next_epoch_shuffling_id: ShufflingId,
current_epoch_shuffling_id: AttestationShufflingId,
next_epoch_shuffling_id: AttestationShufflingId,
) -> Result<Self, String> {
let mut proto_array = ProtoArray {
prune_threshold: DEFAULT_PRUNE_THRESHOLD,
@ -349,7 +349,8 @@ mod test_compute_deltas {
let finalized_desc = Hash256::from_low_u64_be(2);
let not_finalized_desc = Hash256::from_low_u64_be(3);
let unknown = Hash256::from_low_u64_be(4);
let junk_shuffling_id = ShufflingId::from_components(Epoch::new(0), Hash256::zero());
let junk_shuffling_id =
AttestationShufflingId::from_components(Epoch::new(0), Hash256::zero());
let mut fc = ProtoArrayForkChoice::new(
genesis_slot,

View File

@ -18,6 +18,7 @@ pub use validator_statuses::{TotalBalances, ValidatorStatus, ValidatorStatuses};
/// Provides a summary of validator participation during the epoch.
pub struct EpochProcessingSummary {
pub total_balances: TotalBalances,
pub statuses: Vec<ValidatorStatus>,
}
/// Performs per-epoch processing on some BeaconState.
@ -65,6 +66,7 @@ pub fn per_epoch_processing<T: EthSpec>(
Ok(EpochProcessingSummary {
total_balances: validator_statuses.total_balances,
statuses: validator_statuses.statuses,
})
}

View File

@ -527,6 +527,24 @@ impl<T: EthSpec> BeaconState<T> {
self.compute_proposer_index(&indices, &seed, spec)
}
/// Returns the beacon proposer index for each `slot` in `self.current_epoch()`.
///
/// The returned `Vec` contains one proposer index for each slot. For example, if
/// `state.current_epoch() == 1`, then `vec[0]` refers to slot `32` and `vec[1]` refers to slot
/// `33`. It will always be the case that `vec.len() == SLOTS_PER_EPOCH`.
pub fn get_beacon_proposer_indices(&self, spec: &ChainSpec) -> Result<Vec<usize>, Error> {
// Not using the cached validator indices since they are shuffled.
let indices = self.get_active_validator_indices(self.current_epoch(), spec)?;
self.current_epoch()
.slot_iter(T::slots_per_epoch())
.map(|slot| {
let seed = self.get_beacon_proposer_seed(slot, spec)?;
self.compute_proposer_index(&indices, &seed, spec)
})
.collect()
}
/// Compute the seed to use for the beacon proposer selection at the given `slot`.
///
/// Spec v0.12.1

View File

@ -86,7 +86,7 @@ pub use crate::pending_attestation::PendingAttestation;
pub use crate::proposer_slashing::ProposerSlashing;
pub use crate::relative_epoch::{Error as RelativeEpochError, RelativeEpoch};
pub use crate::selection_proof::SelectionProof;
pub use crate::shuffling_id::ShufflingId;
pub use crate::shuffling_id::AttestationShufflingId;
pub use crate::signed_aggregate_and_proof::SignedAggregateAndProof;
pub use crate::signed_beacon_block::{SignedBeaconBlock, SignedBeaconBlockHash};
pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader;

View File

@ -15,12 +15,12 @@ use std::hash::Hash;
///
/// The struct stores exactly that 2-tuple.
#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize, Encode, Decode)]
pub struct ShufflingId {
pub struct AttestationShufflingId {
pub shuffling_epoch: Epoch,
shuffling_decision_block: Hash256,
pub shuffling_decision_block: Hash256,
}
impl ShufflingId {
impl AttestationShufflingId {
/// Using the given `state`, return the shuffling id for the shuffling at the given
/// `relative_epoch`.
///