Optimise per_epoch_processing low-hanging-fruit (#3254)

## Issue Addressed

NA

## Proposed Changes

- Uses a `Vec` in `SingleEpochParticipationCache` rather than `HashMap` to speed up processing times at the cost of memory usage.
- Cache the result of `integer_sqrt` rather than recomputing for each validator.
- Cache `state.previous_epoch` rather than recomputing it for each validator.

### Benchmarks

Benchmarks on a recent mainnet state using #3252 to get timing.

#### Without this PR

```
lcli skip-slots --state-path /tmp/state-0x3cdc.ssz --partial-state-advance --slots 32 --state-root 0x3cdc33cd02713d8d6cc33a6dbe2d3a5bf9af1d357de0d175a403496486ff845e --runs 10
[2022-06-09T08:21:02Z INFO  lcli::skip_slots] Using mainnet spec
[2022-06-09T08:21:02Z INFO  lcli::skip_slots] Advancing 32 slots
[2022-06-09T08:21:02Z INFO  lcli::skip_slots] Doing 10 runs
[2022-06-09T08:21:02Z INFO  lcli::skip_slots] State path: "/tmp/state-0x3cdc.ssz"
SSZ decoding /tmp/state-0x3cdc.ssz: 43ms
[2022-06-09T08:21:03Z INFO  lcli::skip_slots] Run 0: 245.718794ms
[2022-06-09T08:21:03Z INFO  lcli::skip_slots] Run 1: 245.364782ms
[2022-06-09T08:21:03Z INFO  lcli::skip_slots] Run 2: 255.866179ms
[2022-06-09T08:21:04Z INFO  lcli::skip_slots] Run 3: 243.838909ms
[2022-06-09T08:21:04Z INFO  lcli::skip_slots] Run 4: 250.431425ms
[2022-06-09T08:21:04Z INFO  lcli::skip_slots] Run 5: 248.68765ms
[2022-06-09T08:21:04Z INFO  lcli::skip_slots] Run 6: 262.051113ms
[2022-06-09T08:21:05Z INFO  lcli::skip_slots] Run 7: 264.293967ms
[2022-06-09T08:21:05Z INFO  lcli::skip_slots] Run 8: 293.202007ms
[2022-06-09T08:21:05Z INFO  lcli::skip_slots] Run 9: 264.552017ms
```

#### With this PR:

```
lcli skip-slots --state-path /tmp/state-0x3cdc.ssz --partial-state-advance --slots 32 --state-root 0x3cdc33cd02713d8d6cc33a6dbe2d3a5bf9af1d357de0d175a403496486ff845e --runs 10
[2022-06-09T08:57:59Z INFO  lcli::skip_slots] Run 0: 73.898678ms
[2022-06-09T08:57:59Z INFO  lcli::skip_slots] Run 1: 75.536978ms
[2022-06-09T08:57:59Z INFO  lcli::skip_slots] Run 2: 75.176104ms
[2022-06-09T08:57:59Z INFO  lcli::skip_slots] Run 3: 76.460828ms
[2022-06-09T08:57:59Z INFO  lcli::skip_slots] Run 4: 75.904195ms
[2022-06-09T08:58:00Z INFO  lcli::skip_slots] Run 5: 75.53077ms
[2022-06-09T08:58:00Z INFO  lcli::skip_slots] Run 6: 74.745572ms
[2022-06-09T08:58:00Z INFO  lcli::skip_slots] Run 7: 75.823489ms
[2022-06-09T08:58:00Z INFO  lcli::skip_slots] Run 8: 74.892055ms
[2022-06-09T08:58:00Z INFO  lcli::skip_slots] Run 9: 76.333569ms
```

## Additional Info

NA
This commit is contained in:
Paul Hauner 2022-06-10 04:29:28 +00:00
parent 1d016a83f2
commit 11d80a6a38
10 changed files with 90 additions and 46 deletions

View File

@ -90,6 +90,8 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
let att_participation_flags =
get_attestation_participation_flag_indices(state, &att.data, inclusion_delay, spec)
.ok()?;
let base_reward_per_increment =
altair::BaseRewardPerIncrement::new(total_active_balance, spec).ok()?;
let fresh_validators_rewards = attesting_indices
.iter()
@ -98,7 +100,7 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
let participation = participation_list.get(index)?;
let base_reward =
altair::get_base_reward(state, index, total_active_balance, spec).ok()?;
altair::get_base_reward(state, index, base_reward_per_increment, spec).ok()?;
for (flag_index, weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() {
if att_participation_flags.contains(&flag_index)

View File

@ -2,27 +2,45 @@ use integer_sqrt::IntegerSquareRoot;
use safe_arith::{ArithError, SafeArith};
use types::*;
/// This type exists to avoid confusing `total_active_balance` with `base_reward_per_increment`,
/// since they are used in close proximity and the same type (`u64`).
#[derive(Copy, Clone)]
pub struct BaseRewardPerIncrement(u64);
impl BaseRewardPerIncrement {
pub fn new(total_active_balance: u64, spec: &ChainSpec) -> Result<Self, ArithError> {
get_base_reward_per_increment(total_active_balance, spec).map(Self)
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
/// Returns the base reward for some validator.
///
/// The function has a different interface to the spec since it accepts the
/// `base_reward_per_increment` without computing it each time. Avoiding the re computation has
/// shown to be a significant optimisation.
///
/// Spec v1.1.0
pub fn get_base_reward<T: EthSpec>(
state: &BeaconState<T>,
index: usize,
// Should be == get_total_active_balance(state, spec)
total_active_balance: u64,
base_reward_per_increment: BaseRewardPerIncrement,
spec: &ChainSpec,
) -> Result<u64, Error> {
state
.get_effective_balance(index)?
.safe_div(spec.effective_balance_increment)?
.safe_mul(get_base_reward_per_increment(total_active_balance, spec)?)
.safe_mul(base_reward_per_increment.as_u64())
.map_err(Into::into)
}
/// Returns the base reward for some validator.
///
/// Spec v1.1.0
pub fn get_base_reward_per_increment(
fn get_base_reward_per_increment(
total_active_balance: u64,
spec: &ChainSpec,
) -> Result<u64, ArithError> {

View File

@ -1,4 +1,4 @@
use crate::common::{altair::get_base_reward_per_increment, decrease_balance, increase_balance};
use crate::common::{altair::BaseRewardPerIncrement, decrease_balance, increase_balance};
use crate::per_block_processing::errors::{BlockProcessingError, SyncAggregateInvalid};
use crate::{signature_sets::sync_aggregate_signature_set, VerifySignatures};
use safe_arith::SafeArith;
@ -72,7 +72,8 @@ pub fn compute_sync_aggregate_rewards<T: EthSpec>(
let total_active_balance = state.get_total_active_balance()?;
let total_active_increments =
total_active_balance.safe_div(spec.effective_balance_increment)?;
let total_base_rewards = get_base_reward_per_increment(total_active_balance, spec)?
let total_base_rewards = BaseRewardPerIncrement::new(total_active_balance, spec)?
.as_u64()
.safe_mul(total_active_increments)?;
let max_participant_rewards = total_base_rewards
.safe_mul(SYNC_REWARD_WEIGHT)?

View File

@ -1,7 +1,8 @@
use super::*;
use crate::common::{
altair::get_base_reward, get_attestation_participation_flag_indices, increase_balance,
initiate_validator_exit, slash_validator,
altair::{get_base_reward, BaseRewardPerIncrement},
get_attestation_participation_flag_indices, increase_balance, initiate_validator_exit,
slash_validator,
};
use crate::per_block_processing::errors::{BlockProcessingError, IntoWithIndex};
use crate::VerifySignatures;
@ -128,6 +129,7 @@ pub mod altair {
// Update epoch participation flags.
let total_active_balance = state.get_total_active_balance()?;
let base_reward_per_increment = BaseRewardPerIncrement::new(total_active_balance, spec)?;
let mut proposer_reward_numerator = 0;
for index in &indexed_attestation.attesting_indices {
let index = *index as usize;
@ -143,7 +145,7 @@ pub mod altair {
{
validator_participation.add_flag(flag_index)?;
proposer_reward_numerator.safe_add_assign(
get_base_reward(state, index, total_active_balance, spec)?
get_base_reward(state, index, base_reward_per_increment, spec)?
.safe_mul(weight)?,
)?;
}

View File

@ -14,6 +14,7 @@ pub fn process_inactivity_updates<T: EthSpec>(
participation_cache: &ParticipationCache,
spec: &ChainSpec,
) -> Result<(), EpochProcessingError> {
let previous_epoch = state.previous_epoch();
// Score updates based on previous epoch participation, skip genesis epoch
if state.current_epoch() == T::genesis_epoch() {
return Ok(());
@ -33,7 +34,7 @@ pub fn process_inactivity_updates<T: EthSpec>(
.safe_add_assign(spec.inactivity_score_bias)?;
}
// Decrease the score of all validators for forgiveness when not during a leak
if !state.is_in_inactivity_leak(spec) {
if !state.is_in_inactivity_leak(previous_epoch, spec) {
let inactivity_score = state.get_inactivity_score_mut(index)?;
inactivity_score
.safe_sub_assign(min(spec.inactivity_score_recovery_rate, *inactivity_score))?;

View File

@ -12,7 +12,6 @@
//! to get useful summaries about the validator participation in an epoch.
use safe_arith::{ArithError, SafeArith};
use std::collections::HashMap;
use types::{
consts::altair::{
NUM_FLAG_INDICES, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX,
@ -24,6 +23,7 @@ use types::{
#[derive(Debug, PartialEq)]
pub enum Error {
InvalidFlagIndex(usize),
InvalidValidatorIndex(usize),
}
/// A balance which will never be below the specified `minimum`.
@ -64,7 +64,7 @@ struct SingleEpochParticipationCache {
/// It would be ideal to maintain a reference to the `BeaconState` here rather than copying the
/// `ParticipationFlags`, however that would cause us to run into mutable reference limitations
/// upstream.
unslashed_participating_indices: HashMap<usize, ParticipationFlags>,
unslashed_participating_indices: Vec<Option<ParticipationFlags>>,
/// Stores the sum of the balances for all validators in `self.unslashed_participating_indices`
/// for all flags in `NUM_FLAG_INDICES`.
///
@ -76,11 +76,12 @@ struct SingleEpochParticipationCache {
}
impl SingleEpochParticipationCache {
fn new(hashmap_len: usize, spec: &ChainSpec) -> Self {
fn new<T: EthSpec>(state: &BeaconState<T>, spec: &ChainSpec) -> Self {
let num_validators = state.validators().len();
let zero_balance = Balance::zero(spec.effective_balance_increment);
Self {
unslashed_participating_indices: HashMap::with_capacity(hashmap_len),
unslashed_participating_indices: vec![None; num_validators],
total_flag_balances: [zero_balance; NUM_FLAG_INDICES],
total_active_balance: zero_balance,
}
@ -100,7 +101,11 @@ impl SingleEpochParticipationCache {
///
/// May return an error if `flag_index` is out-of-bounds.
fn has_flag(&self, val_index: usize, flag_index: usize) -> Result<bool, Error> {
if let Some(participation_flags) = self.unslashed_participating_indices.get(&val_index) {
let participation_flags = self
.unslashed_participating_indices
.get(val_index)
.ok_or(Error::InvalidValidatorIndex(val_index))?;
if let Some(participation_flags) = participation_flags {
participation_flags
.has_flag(flag_index)
.map_err(|_| Error::InvalidFlagIndex(flag_index))
@ -121,13 +126,14 @@ impl SingleEpochParticipationCache {
&mut self,
val_index: usize,
state: &BeaconState<T>,
current_epoch: Epoch,
relative_epoch: RelativeEpoch,
) -> Result<(), BeaconStateError> {
let val_balance = state.get_effective_balance(val_index)?;
let validator = state.get_validator(val_index)?;
// Sanity check to ensure the validator is active.
let epoch = relative_epoch.into_epoch(state.current_epoch());
let epoch = relative_epoch.into_epoch(current_epoch);
if !validator.is_active_at(epoch) {
return Err(BeaconStateError::ValidatorIsInactive { val_index });
}
@ -149,8 +155,10 @@ impl SingleEpochParticipationCache {
}
// Add their `ParticipationFlags` to the map.
self.unslashed_participating_indices
.insert(val_index, *epoch_participation);
*self
.unslashed_participating_indices
.get_mut(val_index)
.ok_or(BeaconStateError::UnknownValidator(val_index))? = Some(*epoch_participation);
// Iterate through all the flags and increment the total flag balances for whichever flags
// are set for `val_index`.
@ -190,19 +198,10 @@ impl ParticipationCache {
let current_epoch = state.current_epoch();
let previous_epoch = state.previous_epoch();
let num_previous_epoch_active_vals = state
.get_cached_active_validator_indices(RelativeEpoch::Previous)?
.len();
let num_current_epoch_active_vals = state
.get_cached_active_validator_indices(RelativeEpoch::Current)?
.len();
// Both the current/previous epoch participations are set to a capacity that is slightly
// larger than required. The difference will be due slashed-but-active validators.
let mut current_epoch_participation =
SingleEpochParticipationCache::new(num_current_epoch_active_vals, spec);
let mut previous_epoch_participation =
SingleEpochParticipationCache::new(num_previous_epoch_active_vals, spec);
let mut current_epoch_participation = SingleEpochParticipationCache::new(state, spec);
let mut previous_epoch_participation = SingleEpochParticipationCache::new(state, spec);
// Contains the set of validators which are either:
//
// - Active in the previous epoch.
@ -224,6 +223,7 @@ impl ParticipationCache {
current_epoch_participation.process_active_validator(
val_index,
state,
current_epoch,
RelativeEpoch::Current,
)?;
}
@ -232,13 +232,14 @@ impl ParticipationCache {
previous_epoch_participation.process_active_validator(
val_index,
state,
current_epoch,
RelativeEpoch::Previous,
)?;
}
// Note: a validator might still be "eligible" whilst returning `false` to
// `Validator::is_active_at`.
if state.is_eligible_validator(val_index)? {
if state.is_eligible_validator(previous_epoch, val_index)? {
eligible_indices.push(val_index)
}
}
@ -313,16 +314,20 @@ impl ParticipationCache {
* Active/Unslashed
*/
pub fn is_active_unslashed_in_previous_epoch(&self, val_index: usize) -> bool {
/// Returns `None` for an unknown `val_index`.
pub fn is_active_unslashed_in_previous_epoch(&self, val_index: usize) -> Option<bool> {
self.previous_epoch_participation
.unslashed_participating_indices
.contains_key(&val_index)
.get(val_index)
.map(|flags| flags.is_some())
}
pub fn is_active_unslashed_in_current_epoch(&self, val_index: usize) -> bool {
/// Returns `None` for an unknown `val_index`.
pub fn is_active_unslashed_in_current_epoch(&self, val_index: usize) -> Option<bool> {
self.current_epoch_participation
.unslashed_participating_indices
.contains_key(&val_index)
.get(val_index)
.map(|flags| flags.is_some())
}
/*

View File

@ -6,7 +6,10 @@ use types::consts::altair::{
};
use types::{BeaconState, ChainSpec, EthSpec};
use crate::common::{altair::get_base_reward, decrease_balance, increase_balance};
use crate::common::{
altair::{get_base_reward, BaseRewardPerIncrement},
decrease_balance, increase_balance,
};
use crate::per_epoch_processing::{Delta, Error};
/// Apply attester and proposer rewards.
@ -67,13 +70,14 @@ pub fn get_flag_index_deltas<T: EthSpec>(
let unslashed_participating_increments =
unslashed_participating_balance.safe_div(spec.effective_balance_increment)?;
let active_increments = total_active_balance.safe_div(spec.effective_balance_increment)?;
let base_reward_per_increment = BaseRewardPerIncrement::new(total_active_balance, spec)?;
for &index in participation_cache.eligible_validator_indices() {
let base_reward = get_base_reward(state, index, total_active_balance, spec)?;
let base_reward = get_base_reward(state, index, base_reward_per_increment, spec)?;
let mut delta = Delta::default();
if unslashed_participating_indices.contains(index as usize)? {
if !state.is_in_inactivity_leak(spec) {
if !state.is_in_inactivity_leak(previous_epoch, spec) {
let reward_numerator = base_reward
.safe_mul(weight)?
.safe_mul(unslashed_participating_increments)?;

View File

@ -78,6 +78,7 @@ pub fn get_attestation_deltas<T: EthSpec>(
validator_statuses: &ValidatorStatuses,
spec: &ChainSpec,
) -> Result<Vec<AttestationDelta>, Error> {
let previous_epoch = state.previous_epoch();
let finality_delay = state
.previous_epoch()
.safe_sub(state.finalized_checkpoint().epoch)?
@ -92,7 +93,7 @@ pub fn get_attestation_deltas<T: EthSpec>(
// `get_inclusion_delay_deltas`. It's safe to do so here because any validator that is in
// the unslashed indices of the matching source attestations is active, and therefore
// eligible.
if !state.is_eligible_validator(index)? {
if !state.is_eligible_validator(previous_epoch, index)? {
continue;
}

View File

@ -101,7 +101,9 @@ impl<T: EthSpec> EpochProcessingSummary<T> {
EpochProcessingSummary::Altair {
participation_cache,
..
} => participation_cache.is_active_unslashed_in_current_epoch(val_index),
} => participation_cache
.is_active_unslashed_in_current_epoch(val_index)
.unwrap_or(false),
}
}
@ -197,7 +199,9 @@ impl<T: EthSpec> EpochProcessingSummary<T> {
EpochProcessingSummary::Altair {
participation_cache,
..
} => participation_cache.is_active_unslashed_in_previous_epoch(val_index),
} => participation_cache
.is_active_unslashed_in_previous_epoch(val_index)
.unwrap_or(false),
}
}

View File

@ -1602,17 +1602,23 @@ impl<T: EthSpec> BeaconState<T> {
self.clone_with(CloneConfig::committee_caches_only())
}
pub fn is_eligible_validator(&self, val_index: usize) -> Result<bool, Error> {
let previous_epoch = self.previous_epoch();
/// Passing `previous_epoch` to this function rather than computing it internally provides
/// a tangible speed improvement in state processing.
pub fn is_eligible_validator(
&self,
previous_epoch: Epoch,
val_index: usize,
) -> Result<bool, Error> {
self.get_validator(val_index).map(|val| {
val.is_active_at(previous_epoch)
|| (val.slashed && previous_epoch + Epoch::new(1) < val.withdrawable_epoch)
})
}
pub fn is_in_inactivity_leak(&self, spec: &ChainSpec) -> bool {
(self.previous_epoch() - self.finalized_checkpoint().epoch)
> spec.min_epochs_to_inactivity_penalty
/// Passing `previous_epoch` to this function rather than computing it internally provides
/// a tangible speed improvement in state processing.
pub fn is_in_inactivity_leak(&self, previous_epoch: Epoch, spec: &ChainSpec) -> bool {
(previous_epoch - self.finalized_checkpoint().epoch) > spec.min_epochs_to_inactivity_penalty
}
/// Get the `SyncCommittee` associated with the next slot. Useful because sync committees