Optimize process_attestation with active balance cache (#2560)

## Proposed Changes

Cache the total active balance for the current epoch in the `BeaconState`. Computing this value takes around 1ms, and this was negatively impacting block processing times on Prater, particularly when reconstructing states.

With a large number of attestations in each block, I saw the `process_attestations` function taking 150ms, which means that reconstructing hot states can take up to 4.65s (31 * 150ms), and reconstructing freezer states can take up to 307s (2047 * 150ms).

I opted to add the cache to the beacon state rather than computing the total active balance at the start of state processing and threading it through. Although this would be simpler in a way, it would waste time, particularly during block replay, as the total active balance doesn't change for the duration of an epoch. So we save ~32ms for hot states, and up to 8.1s for freezer states (using `--slots-per-restore-point 8192`).
This commit is contained in:
Michael Sproul 2021-09-03 07:50:43 +00:00
parent f4aa1d8aea
commit 9c785a9b33
10 changed files with 100 additions and 24 deletions

View File

@ -28,8 +28,8 @@ use std::ptr;
use types::{ use types::{
sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttesterSlashing, sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttesterSlashing,
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, Hash256, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, Hash256,
ProposerSlashing, RelativeEpoch, SignedVoluntaryExit, Slot, SyncAggregate, ProposerSlashing, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution,
SyncCommitteeContribution, Validator, Validator,
}; };
type SyncContributions<T> = RwLock<HashMap<SyncAggregateId, Vec<SyncCommitteeContribution<T>>>>; type SyncContributions<T> = RwLock<HashMap<SyncAggregateId, Vec<SyncCommitteeContribution<T>>>>;
@ -259,11 +259,8 @@ impl<T: EthSpec> OperationPool<T> {
let prev_epoch = state.previous_epoch(); let prev_epoch = state.previous_epoch();
let current_epoch = state.current_epoch(); let current_epoch = state.current_epoch();
let all_attestations = self.attestations.read(); let all_attestations = self.attestations.read();
let active_indices = state
.get_cached_active_validator_indices(RelativeEpoch::Current)
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
let total_active_balance = state let total_active_balance = state
.get_total_balance(active_indices, spec) .get_total_active_balance()
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?; .map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
// Split attestations for the previous & current epochs, so that we // Split attestations for the previous & current epochs, so that we
@ -1143,10 +1140,7 @@ mod release_tests {
.expect("should have valid best attestations"); .expect("should have valid best attestations");
assert_eq!(best_attestations.len(), max_attestations); assert_eq!(best_attestations.len(), max_attestations);
let active_indices = state let total_active_balance = state.get_total_active_balance().unwrap();
.get_cached_active_validator_indices(RelativeEpoch::Current)
.unwrap();
let total_active_balance = state.get_total_balance(active_indices, spec).unwrap();
// Set of indices covered by previous attestations in `best_attestations`. // Set of indices covered by previous attestations in `best_attestations`.
let mut seen_indices = BTreeSet::new(); let mut seen_indices = BTreeSet::new();

View File

@ -300,6 +300,7 @@ macro_rules! impl_try_into_beacon_state {
finalized_checkpoint: $inner.finalized_checkpoint, finalized_checkpoint: $inner.finalized_checkpoint,
// Caching // Caching
total_active_balance: <_>::default(),
committee_caches: <_>::default(), committee_caches: <_>::default(),
pubkey_cache: <_>::default(), pubkey_cache: <_>::default(),
exit_cache: <_>::default(), exit_cache: <_>::default(),

View File

@ -42,7 +42,7 @@ pub fn process_sync_aggregate<T: EthSpec>(
} }
// Compute participant and proposer rewards // Compute participant and proposer rewards
let total_active_balance = state.get_total_active_balance(spec)?; let total_active_balance = state.get_total_active_balance()?;
let total_active_increments = let total_active_increments =
total_active_balance.safe_div(spec.effective_balance_increment)?; total_active_balance.safe_div(spec.effective_balance_increment)?;
let total_base_rewards = get_base_reward_per_increment(total_active_balance, spec)? let total_base_rewards = get_base_reward_per_increment(total_active_balance, spec)?

View File

@ -127,7 +127,7 @@ pub mod altair {
get_attestation_participation_flag_indices(state, data, inclusion_delay, spec)?; get_attestation_participation_flag_indices(state, data, inclusion_delay, spec)?;
// Update epoch participation flags. // Update epoch participation flags.
let total_active_balance = state.get_total_active_balance(spec)?; let total_active_balance = state.get_total_active_balance()?;
let mut proposer_reward_numerator = 0; let mut proposer_reward_numerator = 0;
for index in &indexed_attestation.attesting_indices { for index in &indexed_attestation.attesting_indices {
let index = *index as usize; let index = *index as usize;

View File

@ -72,7 +72,7 @@ pub fn process_epoch<T: EthSpec>(
process_sync_committee_updates(state, spec)?; process_sync_committee_updates(state, spec)?;
// Rotate the epoch caches to suit the epoch transition. // Rotate the epoch caches to suit the epoch transition.
state.advance_caches()?; state.advance_caches(spec)?;
Ok(EpochProcessingSummary::Altair { Ok(EpochProcessingSummary::Altair {
participation_cache, participation_cache,

View File

@ -66,7 +66,7 @@ pub fn process_epoch<T: EthSpec>(
process_participation_record_updates(state)?; process_participation_record_updates(state)?;
// Rotate the epoch caches to suit the epoch transition. // Rotate the epoch caches to suit the epoch transition.
state.advance_caches()?; state.advance_caches(spec)?;
Ok(EpochProcessingSummary::Base { Ok(EpochProcessingSummary::Base {
total_balances: validator_statuses.total_balances, total_balances: validator_statuses.total_balances,

View File

@ -100,6 +100,7 @@ pub fn upgrade_to_altair<E: EthSpec>(
current_sync_committee: temp_sync_committee.clone(), // not read current_sync_committee: temp_sync_committee.clone(), // not read
next_sync_committee: temp_sync_committee, // not read next_sync_committee: temp_sync_committee, // not read
// Caches // Caches
total_active_balance: pre.total_active_balance,
committee_caches: mem::take(&mut pre.committee_caches), committee_caches: mem::take(&mut pre.committee_caches),
pubkey_cache: mem::take(&mut pre.pubkey_cache), pubkey_cache: mem::take(&mut pre.pubkey_cache),
exit_cache: mem::take(&mut pre.exit_cache), exit_cache: mem::take(&mut pre.exit_cache),

View File

@ -86,6 +86,11 @@ pub enum Error {
}, },
PreviousCommitteeCacheUninitialized, PreviousCommitteeCacheUninitialized,
CurrentCommitteeCacheUninitialized, CurrentCommitteeCacheUninitialized,
TotalActiveBalanceCacheUninitialized,
TotalActiveBalanceCacheInconsistent {
initialized_epoch: Epoch,
current_epoch: Epoch,
},
RelativeEpochError(RelativeEpochError), RelativeEpochError(RelativeEpochError),
ExitCacheUninitialized, ExitCacheUninitialized,
CommitteeCacheUninitialized(Option<RelativeEpoch>), CommitteeCacheUninitialized(Option<RelativeEpoch>),
@ -275,6 +280,13 @@ where
#[tree_hash(skip_hashing)] #[tree_hash(skip_hashing)]
#[test_random(default)] #[test_random(default)]
#[derivative(Clone(clone_with = "clone_default"))] #[derivative(Clone(clone_with = "clone_default"))]
pub total_active_balance: Option<(Epoch, u64)>,
#[serde(skip_serializing, skip_deserializing)]
#[ssz(skip_serializing)]
#[ssz(skip_deserializing)]
#[tree_hash(skip_hashing)]
#[test_random(default)]
#[derivative(Clone(clone_with = "clone_default"))]
pub committee_caches: [CommitteeCache; CACHED_EPOCHS], pub committee_caches: [CommitteeCache; CACHED_EPOCHS],
#[serde(skip_serializing, skip_deserializing)] #[serde(skip_serializing, skip_deserializing)]
#[ssz(skip_serializing)] #[ssz(skip_serializing)]
@ -353,6 +365,7 @@ impl<T: EthSpec> BeaconState<T> {
finalized_checkpoint: Checkpoint::default(), finalized_checkpoint: Checkpoint::default(),
// Caching (not in spec) // Caching (not in spec)
total_active_balance: None,
committee_caches: [ committee_caches: [
CommitteeCache::default(), CommitteeCache::default(),
CommitteeCache::default(), CommitteeCache::default(),
@ -1226,12 +1239,45 @@ impl<T: EthSpec> BeaconState<T> {
} }
/// Implementation of `get_total_active_balance`, matching the spec. /// Implementation of `get_total_active_balance`, matching the spec.
pub fn get_total_active_balance(&self, spec: &ChainSpec) -> Result<u64, Error> { ///
/// Requires the total active balance cache to be initialised, which is initialised whenever
/// the current committee cache is.
///
/// Returns minimum `EFFECTIVE_BALANCE_INCREMENT`, to avoid div by 0.
pub fn get_total_active_balance(&self) -> Result<u64, Error> {
let (initialized_epoch, balance) = self
.total_active_balance()
.ok_or(Error::TotalActiveBalanceCacheUninitialized)?;
let current_epoch = self.current_epoch();
if initialized_epoch == current_epoch {
Ok(balance)
} else {
Err(Error::TotalActiveBalanceCacheInconsistent {
initialized_epoch,
current_epoch,
})
}
}
/// Build the total active balance cache.
///
/// This function requires the current committee cache to be already built. It is called
/// automatically when `build_committee_cache` is called for the current epoch.
fn build_total_active_balance_cache(&mut self, spec: &ChainSpec) -> Result<(), Error> {
// Order is irrelevant, so use the cached indices. // Order is irrelevant, so use the cached indices.
self.get_total_balance( let current_epoch = self.current_epoch();
let total_active_balance = self.get_total_balance(
self.get_cached_active_validator_indices(RelativeEpoch::Current)?, self.get_cached_active_validator_indices(RelativeEpoch::Current)?,
spec, spec,
) )?;
*self.total_active_balance_mut() = Some((current_epoch, total_active_balance));
Ok(())
}
/// Set the cached total active balance to `None`, representing no known value.
pub fn drop_total_active_balance_cache(&mut self) {
*self.total_active_balance_mut() = None;
} }
/// Get a mutable reference to the epoch participation flags for `epoch`. /// Get a mutable reference to the epoch participation flags for `epoch`.
@ -1294,6 +1340,7 @@ impl<T: EthSpec> BeaconState<T> {
/// Drop all caches on the state. /// Drop all caches on the state.
pub fn drop_all_caches(&mut self) -> Result<(), Error> { pub fn drop_all_caches(&mut self) -> Result<(), Error> {
self.drop_total_active_balance_cache();
self.drop_committee_cache(RelativeEpoch::Previous)?; self.drop_committee_cache(RelativeEpoch::Previous)?;
self.drop_committee_cache(RelativeEpoch::Current)?; self.drop_committee_cache(RelativeEpoch::Current)?;
self.drop_committee_cache(RelativeEpoch::Next)?; self.drop_committee_cache(RelativeEpoch::Next)?;
@ -1323,11 +1370,14 @@ impl<T: EthSpec> BeaconState<T> {
.committee_cache_at_index(i)? .committee_cache_at_index(i)?
.is_initialized_at(relative_epoch.into_epoch(self.current_epoch())); .is_initialized_at(relative_epoch.into_epoch(self.current_epoch()));
if is_initialized { if !is_initialized {
Ok(()) self.force_build_committee_cache(relative_epoch, spec)?;
} else {
self.force_build_committee_cache(relative_epoch, spec)
} }
if self.total_active_balance().is_none() && relative_epoch == RelativeEpoch::Current {
self.build_total_active_balance_cache(spec)?;
}
Ok(())
} }
/// Always builds the previous epoch cache, even if it is already initialized. /// Always builds the previous epoch cache, even if it is already initialized.
@ -1359,10 +1409,36 @@ impl<T: EthSpec> BeaconState<T> {
/// ///
/// This should be used if the `slot` of this state is advanced beyond an epoch boundary. /// This should be used if the `slot` of this state is advanced beyond an epoch boundary.
/// ///
/// Note: whilst this function will preserve already-built caches, it will not build any. /// Note: this function will not build any new committee caches, but will build the total
pub fn advance_caches(&mut self) -> Result<(), Error> { /// balance cache if the (new) current epoch cache is initialized.
pub fn advance_caches(&mut self, spec: &ChainSpec) -> Result<(), Error> {
self.committee_caches_mut().rotate_left(1); self.committee_caches_mut().rotate_left(1);
// Re-compute total active balance for current epoch.
//
// This can only be computed once the state's effective balances have been updated
// for the current epoch. I.e. it is not possible to know this value with the same
// lookahead as the committee shuffling.
let curr = Self::committee_cache_index(RelativeEpoch::Current);
let curr_cache = mem::take(self.committee_cache_at_index_mut(curr)?);
// If current epoch cache is initialized, compute the total active balance from its
// indices. We check that the cache is initialized at the _next_ epoch because the slot has
// not yet been advanced.
let new_current_epoch = self.next_epoch()?;
if curr_cache.is_initialized_at(new_current_epoch) {
*self.total_active_balance_mut() = Some((
new_current_epoch,
self.get_total_balance(curr_cache.active_validator_indices(), spec)?,
));
}
// If the cache is not initialized, then the previous cached value for the total balance is
// wrong, so delete it.
else {
self.drop_total_active_balance_cache();
}
*self.committee_cache_at_index_mut(curr)? = curr_cache;
let next = Self::committee_cache_index(RelativeEpoch::Next); let next = Self::committee_cache_index(RelativeEpoch::Next);
*self.committee_cache_at_index_mut(next)? = CommitteeCache::default(); *self.committee_cache_at_index_mut(next)? = CommitteeCache::default();
Ok(()) Ok(())
@ -1504,6 +1580,7 @@ impl<T: EthSpec> BeaconState<T> {
}; };
if config.committee_caches { if config.committee_caches {
*res.committee_caches_mut() = self.committee_caches().clone(); *res.committee_caches_mut() = self.committee_caches().clone();
*res.total_active_balance_mut() = *self.total_active_balance();
} }
if config.pubkey_cache { if config.pubkey_cache {
*res.pubkey_cache_mut() = self.pubkey_cache().clone(); *res.pubkey_cache_mut() = self.pubkey_cache().clone();

View File

@ -165,6 +165,9 @@ fn test_clone_config<E: EthSpec>(base_state: &BeaconState<E>, clone_config: Clon
state state
.committee_cache(RelativeEpoch::Next) .committee_cache(RelativeEpoch::Next)
.expect("committee cache exists"); .expect("committee cache exists");
state
.total_active_balance()
.expect("total active balance exists");
} else { } else {
state state
.committee_cache(RelativeEpoch::Previous) .committee_cache(RelativeEpoch::Previous)

View File

@ -123,7 +123,7 @@ impl<E: EthSpec> Case for RewardsTest<E> {
Ok(convert_all_base_deltas(&deltas)) Ok(convert_all_base_deltas(&deltas))
} else { } else {
let total_active_balance = state.get_total_active_balance(spec)?; let total_active_balance = state.get_total_active_balance()?;
let source_deltas = compute_altair_flag_deltas( let source_deltas = compute_altair_flag_deltas(
&state, &state,