Optimise balances cache in case of skipped slots (#2849)

## Proposed Changes

Remove the `is_first_block_in_epoch` logic from the balances cache update logic, as it was incorrect in the case of skipped slots. The updated code is simpler because regardless of whether the block is the first in the epoch we can check if an entry for the epoch boundary root already exists in the cache, and update the cache accordingly.

Additionally, to assist with flip-flopping justified epochs, move to cloning the balance cache rather than moving it. This should still be very fast in practice because the balances cache is a ~1.6MB `Vec`, and this operation is expected to only occur infrequently.
This commit is contained in:
Michael Sproul 2021-12-13 23:35:57 +00:00
parent b22ac95d7f
commit a43d5e161f
5 changed files with 137 additions and 75 deletions

View File

@ -12,7 +12,9 @@ use std::marker::PhantomData;
use std::sync::Arc;
use store::{Error as StoreError, HotColdDB, ItemStore};
use superstruct::superstruct;
use types::{BeaconBlock, BeaconState, BeaconStateError, Checkpoint, EthSpec, Hash256, Slot};
use types::{
BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256, Slot,
};
#[derive(Debug)]
pub enum Error {
@ -56,24 +58,34 @@ pub fn get_effective_balances<T: EthSpec>(state: &BeaconState<T>) -> Vec<u64> {
.collect()
}
/// An item that is stored in the `BalancesCache`.
#[derive(PartialEq, Clone, Debug, Encode, Decode)]
struct CacheItem {
/// The block root at which `self.balances` are valid.
block_root: Hash256,
/// The effective balances from a `BeaconState` validator registry.
balances: Vec<u64>,
#[superstruct(
variants(V1, V8),
variant_attributes(derive(PartialEq, Clone, Debug, Encode, Decode)),
no_enum
)]
pub(crate) struct CacheItem {
pub(crate) block_root: Hash256,
#[superstruct(only(V8))]
pub(crate) epoch: Epoch,
pub(crate) balances: Vec<u64>,
}
/// Provides a cache to avoid reading `BeaconState` from disk when updating the current justified
/// checkpoint.
///
/// It is effectively a mapping of `epoch_boundary_block_root -> state.balances`.
#[derive(PartialEq, Clone, Default, Debug, Encode, Decode)]
pub(crate) type CacheItem = CacheItemV8;
#[superstruct(
variants(V1, V8),
variant_attributes(derive(PartialEq, Clone, Default, Debug, Encode, Decode)),
no_enum
)]
pub struct BalancesCache {
items: Vec<CacheItem>,
#[superstruct(only(V1))]
pub(crate) items: Vec<CacheItemV1>,
#[superstruct(only(V8))]
pub(crate) items: Vec<CacheItemV8>,
}
pub type BalancesCache = BalancesCacheV8;
impl BalancesCache {
/// Inspect the given `state` and determine the root of the block at the first slot of
/// `state.current_epoch`. If there is not already some entry for the given block root, then
@ -83,13 +95,8 @@ impl BalancesCache {
block_root: Hash256,
state: &BeaconState<E>,
) -> Result<(), Error> {
// We are only interested in balances from states that are at the start of an epoch,
// because this is where the `current_justified_checkpoint.root` will point.
if !Self::is_first_block_in_epoch(block_root, state)? {
return Ok(());
}
let epoch_boundary_slot = state.current_epoch().start_slot(E::slots_per_epoch());
let epoch = state.current_epoch();
let epoch_boundary_slot = epoch.start_slot(E::slots_per_epoch());
let epoch_boundary_root = if epoch_boundary_slot == state.slot() {
block_root
} else {
@ -98,9 +105,14 @@ impl BalancesCache {
*state.get_block_root(epoch_boundary_slot)?
};
if self.position(epoch_boundary_root).is_none() {
// Check if there already exists a cache entry for the epoch boundary block of the current
// epoch. We rely on the invariant that effective balances do not change for the duration
// of a single epoch, so even if the block on the epoch boundary itself is skipped we can
// still update its cache entry from any subsequent state in that epoch.
if self.position(epoch_boundary_root, epoch).is_none() {
let item = CacheItem {
block_root: epoch_boundary_root,
epoch,
balances: get_effective_balances(state),
};
@ -114,43 +126,18 @@ impl BalancesCache {
Ok(())
}
/// Returns `true` if the given `block_root` is the first/only block to have been processed in
/// the epoch of the given `state`.
///
/// We can determine if it is the first block by looking back through `state.block_roots` to
/// see if there is a block in the current epoch with a different root.
fn is_first_block_in_epoch<E: EthSpec>(
block_root: Hash256,
state: &BeaconState<E>,
) -> Result<bool, Error> {
let mut prior_block_found = false;
for slot in state.current_epoch().slot_iter(E::slots_per_epoch()) {
if slot < state.slot() {
if *state.get_block_root(slot)? != block_root {
prior_block_found = true;
break;
}
} else {
break;
}
}
Ok(!prior_block_found)
}
fn position(&self, block_root: Hash256) -> Option<usize> {
fn position(&self, block_root: Hash256, epoch: Epoch) -> Option<usize> {
self.items
.iter()
.position(|item| item.block_root == block_root)
.position(|item| item.block_root == block_root && item.epoch == epoch)
}
/// Get the balances for the given `block_root`, if any.
///
/// If some balances are found, they are removed from the cache.
pub fn get(&mut self, block_root: Hash256) -> Option<Vec<u64>> {
let i = self.position(block_root)?;
Some(self.items.remove(i).balances)
/// If some balances are found, they are cloned from the cache.
pub fn get(&mut self, block_root: Hash256, epoch: Epoch) -> Option<Vec<u64>> {
let i = self.position(block_root, epoch)?;
Some(self.items[i].balances.clone())
}
}
@ -303,7 +290,10 @@ where
fn set_justified_checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), Error> {
self.justified_checkpoint = checkpoint;
if let Some(balances) = self.balances_cache.get(self.justified_checkpoint.root) {
if let Some(balances) = self.balances_cache.get(
self.justified_checkpoint.root,
self.justified_checkpoint.epoch,
) {
metrics::inc_counter(&metrics::BALANCES_CACHE_HITS);
self.justified_balances = balances;
} else {
@ -338,16 +328,23 @@ where
}
/// A container which allows persisting the `BeaconForkChoiceStore` to the on-disk database.
#[superstruct(variants(V1, V7), variant_attributes(derive(Encode, Decode)), no_enum)]
#[superstruct(
variants(V1, V7, V8),
variant_attributes(derive(Encode, Decode)),
no_enum
)]
pub struct PersistedForkChoiceStore {
pub balances_cache: BalancesCache,
#[superstruct(only(V1, V7))]
pub balances_cache: BalancesCacheV1,
#[superstruct(only(V8))]
pub balances_cache: BalancesCacheV8,
pub time: Slot,
pub finalized_checkpoint: Checkpoint,
pub justified_checkpoint: Checkpoint,
pub justified_balances: Vec<u64>,
pub best_justified_checkpoint: Checkpoint,
#[superstruct(only(V7))]
#[superstruct(only(V7, V8))]
pub proposer_boost_root: Hash256,
}
pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV7;
pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV8;

View File

@ -1,19 +1,27 @@
use crate::beacon_fork_choice_store::{PersistedForkChoiceStoreV1, PersistedForkChoiceStoreV7};
use crate::beacon_fork_choice_store::{
PersistedForkChoiceStoreV1, PersistedForkChoiceStoreV7, PersistedForkChoiceStoreV8,
};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use store::{DBColumn, Error, StoreItem};
use superstruct::superstruct;
// If adding a new version you should update this type alias and fix the breakages.
pub type PersistedForkChoice = PersistedForkChoiceV7;
pub type PersistedForkChoice = PersistedForkChoiceV8;
#[superstruct(variants(V1, V7), variant_attributes(derive(Encode, Decode)), no_enum)]
#[superstruct(
variants(V1, V7, V8),
variant_attributes(derive(Encode, Decode)),
no_enum
)]
pub struct PersistedForkChoice {
pub fork_choice: fork_choice::PersistedForkChoice,
#[superstruct(only(V1))]
pub fork_choice_store: PersistedForkChoiceStoreV1,
#[superstruct(only(V7))]
pub fork_choice_store: PersistedForkChoiceStoreV7,
#[superstruct(only(V8))]
pub fork_choice_store: PersistedForkChoiceStoreV8,
}
macro_rules! impl_store_item {
@ -36,3 +44,4 @@ macro_rules! impl_store_item {
impl_store_item!(PersistedForkChoiceV1);
impl_store_item!(PersistedForkChoiceV7);
impl_store_item!(PersistedForkChoiceV8);

View File

@ -1,11 +1,11 @@
//! Utilities for managing database schema changes.
mod migration_schema_v6;
mod migration_schema_v7;
mod migration_schema_v8;
mod types;
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY};
use crate::persisted_fork_choice::{PersistedForkChoiceV1, PersistedForkChoiceV7};
use crate::store::{get_key_for_col, KeyValueStoreOp};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use operation_pool::{PersistedOperationPool, PersistedOperationPoolBase};
use slog::{warn, Logger};
@ -113,12 +113,8 @@ pub fn migrate_schema<T: BeaconChainTypes>(
migration_schema_v6::update_execution_statuses::<T>(&mut persisted_fork_choice)
.map_err(StoreError::SchemaMigrationError)?;
let column = PersistedForkChoiceV1::db_column().into();
let key = FORK_CHOICE_DB_KEY.as_bytes();
let db_key = get_key_for_col(column, key);
let op =
KeyValueStoreOp::PutKeyValue(db_key, persisted_fork_choice.as_store_bytes());
ops.push(op);
// Store the converted fork choice store under the same key.
ops.push(persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY));
}
db.store_schema_version_atomically(to, ops)?;
@ -163,12 +159,22 @@ pub fn migrate_schema<T: BeaconChainTypes>(
}
// Store the converted fork choice store under the same key.
let column = PersistedForkChoiceV7::db_column().into();
let key = FORK_CHOICE_DB_KEY.as_bytes();
let db_key = get_key_for_col(column, key);
let op =
KeyValueStoreOp::PutKeyValue(db_key, persisted_fork_choice_v7.as_store_bytes());
ops.push(op);
ops.push(persisted_fork_choice_v7.as_kv_store_op(FORK_CHOICE_DB_KEY));
}
db.store_schema_version_atomically(to, ops)?;
Ok(())
}
// Migration to add an `epoch` key to the fork choice's balances cache.
(SchemaVersion(7), SchemaVersion(8)) => {
let mut ops = vec![];
let fork_choice_opt = db.get_item::<PersistedForkChoiceV7>(&FORK_CHOICE_DB_KEY)?;
if let Some(fork_choice) = fork_choice_opt {
let updated_fork_choice =
migration_schema_v8::update_fork_choice::<T>(fork_choice, db.clone())?;
ops.push(updated_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY));
}
db.store_schema_version_atomically(to, ops)?;

View File

@ -0,0 +1,50 @@
use crate::beacon_chain::BeaconChainTypes;
use crate::beacon_fork_choice_store::{
BalancesCacheV8, CacheItemV8, PersistedForkChoiceStoreV7, PersistedForkChoiceStoreV8,
};
use crate::persisted_fork_choice::{PersistedForkChoiceV7, PersistedForkChoiceV8};
use std::sync::Arc;
use store::{Error as StoreError, HotColdDB};
use types::EthSpec;
pub fn update_fork_choice<T: BeaconChainTypes>(
fork_choice: PersistedForkChoiceV7,
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<PersistedForkChoiceV8, StoreError> {
let PersistedForkChoiceStoreV7 {
balances_cache,
time,
finalized_checkpoint,
justified_checkpoint,
justified_balances,
best_justified_checkpoint,
proposer_boost_root,
} = fork_choice.fork_choice_store;
let mut fork_choice_store = PersistedForkChoiceStoreV8 {
balances_cache: BalancesCacheV8::default(),
time,
finalized_checkpoint,
justified_checkpoint,
justified_balances,
best_justified_checkpoint,
proposer_boost_root,
};
// Add epochs to the balances cache. It's safe to just use the block's epoch because
// before schema v8 the cache would always miss on skipped slots.
for item in balances_cache.items {
// Drop any blocks that aren't found, they're presumably too old and this is only a cache.
if let Some(block) = db.get_block(&item.block_root)? {
fork_choice_store.balances_cache.items.push(CacheItemV8 {
block_root: item.block_root,
epoch: block.slot().epoch(T::EthSpec::slots_per_epoch()),
balances: item.balances,
});
}
}
Ok(PersistedForkChoiceV8 {
fork_choice: fork_choice.fork_choice,
fork_choice_store,
})
}

View File

@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(7);
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(8);
// All the keys that get stored under the `BeaconMeta` column.
//