Customisable shuffling cache size (#4081)

This PR enables the user to adjust the shuffling cache size.

This is useful for some HTTP API requests which require re-computing old shufflings. This PR currently optimizes the
beacon/states/{state_id}/committees HTTP API by first checking the cache before re-building shuffling.

If the shuffling is set to a non-default value, then the HTTP API request will also fill the cache when as it constructs new shufflings.

If the CLI flag is not present or the value is set to the default of 16 the default behaviour is observed.


Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Age Manning 2023-03-21 05:14:59 +00:00
parent 76a2007b64
commit 785a9171e6
8 changed files with 151 additions and 42 deletions

View File

@ -765,6 +765,7 @@ where
let genesis_time = head_snapshot.beacon_state.genesis_time();
let head_for_snapshot_cache = head_snapshot.clone();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
let beacon_chain = BeaconChain {
spec: self.spec,
@ -818,7 +819,7 @@ where
DEFAULT_SNAPSHOT_CACHE_SIZE,
head_for_snapshot_cache,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(shuffling_cache_size)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),

View File

@ -67,6 +67,8 @@ pub struct ChainConfig {
pub prepare_payload_lookahead: Duration,
/// Use EL-free optimistic sync for the finalized part of the chain.
pub optimistic_finalized_sync: bool,
/// The size of the shuffling cache,
pub shuffling_cache_size: usize,
/// Whether to send payload attributes every slot, regardless of connected proposers.
///
/// This is useful for block builders and testing.
@ -97,6 +99,7 @@ impl Default for ChainConfig {
prepare_payload_lookahead: Duration::from_secs(4),
// This value isn't actually read except in tests.
optimistic_finalized_sync: true,
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
always_prepare_payload: false,
}
}

View File

@ -40,7 +40,7 @@ mod persisted_fork_choice;
mod pre_finalization_cache;
pub mod proposer_prep_service;
pub mod schema_change;
mod shuffling_cache;
pub mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
pub mod sync_committee_rewards;

View File

@ -9,7 +9,7 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256
/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash +
/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this
/// ignores a few extra bytes in the caches that should be insignificant compared to the indices).
const CACHE_SIZE: usize = 16;
pub const DEFAULT_CACHE_SIZE: usize = 16;
/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this
/// limits the number of concurrent states that can be loaded into memory for the committee cache.
@ -54,9 +54,9 @@ pub struct ShufflingCache {
}
impl ShufflingCache {
pub fn new() -> Self {
pub fn new(cache_size: usize) -> Self {
Self {
cache: LruCache::new(CACHE_SIZE),
cache: LruCache::new(cache_size),
}
}
@ -172,7 +172,7 @@ impl ToArcCommitteeCache for Arc<CommitteeCache> {
impl Default for ShufflingCache {
fn default() -> Self {
Self::new()
Self::new(DEFAULT_CACHE_SIZE)
}
}
@ -249,7 +249,7 @@ mod test {
fn resolved_promise() {
let (committee_a, _) = committee_caches();
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
@ -276,7 +276,7 @@ mod test {
#[test]
fn unresolved_promise() {
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
@ -301,7 +301,7 @@ mod test {
fn two_promises() {
let (committee_a, committee_b) = committee_caches();
let (id_a, id_b) = (shuffling_id(1), shuffling_id(2));
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create promise A.
let sender_a = cache.create_promise(id_a.clone()).unwrap();
@ -355,7 +355,7 @@ mod test {
#[test]
fn too_many_promises() {
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
for i in 0..MAX_CONCURRENT_PROMISES {
cache.create_promise(shuffling_id(i as u64)).unwrap();

View File

@ -54,8 +54,8 @@ use system_health::observe_system_health_bn;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload,
CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
@ -784,39 +784,112 @@ pub fn serve<T: BeaconChainTypes>(
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);
let committee_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state
.committee_cache_is_initialized(relative_epoch) =>
{
state.committee_cache(relative_epoch).map(Cow::Borrowed)
}
_ => CommitteeCache::initialized(state, epoch, &chain.spec)
// Attempt to obtain the committee_cache from the beacon chain
let decision_slot = (epoch.saturating_sub(2u64))
.end_slot(T::EthSpec::slots_per_epoch());
// Find the decision block and skip to another method on any kind
// of failure
let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{
Some(AttestationShufflingId {
shuffling_epoch: epoch,
shuffling_decision_block,
})
} else {
None
};
// Attempt to read from the chain cache if there exists a
// shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) =
shuffling_id.as_ref()
{
chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
} else {
None
};
let committee_cache = if let Some(ref shuffling) =
maybe_cached_shuffling
{
Cow::Borrowed(&**shuffling)
} else {
let possibly_built_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state.committee_cache_is_initialized(
relative_epoch,
) =>
{
state
.committee_cache(relative_epoch)
.map(Cow::Borrowed)
}
_ => CommitteeCache::initialized(
state,
epoch,
&chain.spec,
)
.map(Cow::Owned),
}
.map_err(|e| match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root() as u64;
let first_subsequent_restore_point_slot = ((epoch
.start_slot(T::EthSpec::slots_per_epoch())
/ max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(format!(
"epoch out of bounds, try state at slot {}",
first_subsequent_restore_point_slot,
))
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, too far in future".into(),
)
}
.map_err(|e| {
match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root()
as u64;
let first_subsequent_restore_point_slot =
((epoch.start_slot(
T::EthSpec::slots_per_epoch(),
) / max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(
format!(
"epoch out of bounds, \
try state at slot {}",
first_subsequent_restore_point_slot,
),
)
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, \
too far in future"
.into(),
)
}
}
_ => {
warp_utils::reject::beacon_chain_error(e.into())
}
}
})?;
// Attempt to write to the beacon cache (only if the cache
// size is not the default value).
if chain.config.shuffling_cache_size
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
{
if let Some(shuffling_id) = shuffling_id {
if let Some(mut cache_write) = chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
{
cache_write.insert_committee_cache(
shuffling_id,
&*possibly_built_cache,
);
}
}
_ => warp_utils::reject::beacon_chain_error(e.into()),
})?;
}
possibly_built_cache
};
// Use either the supplied slot or all slots in the epoch.
let slots =

View File

@ -370,6 +370,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
address of this server (e.g., http://localhost:5054).")
.takes_value(true),
)
.arg(
Arg::with_name("shuffling-cache-size")
.long("shuffling-cache-size")
.help("Some HTTP API requests can be optimised by caching the shufflings at each epoch. \
This flag allows the user to set the shuffling cache size in epochs. \
Shufflings are dependent on validator count and setting this value to a large number can consume a large amount of memory.")
.takes_value(true)
)
/*
* Monitoring metrics

View File

@ -148,6 +148,10 @@ pub fn get_config<E: EthSpec>(
client_config.http_api.allow_sync_stalled = true;
}
if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
client_config.chain.shuffling_cache_size = cache_size;
}
/*
* Prometheus metrics HTTP server
*/

View File

@ -118,6 +118,26 @@ fn disable_lock_timeouts_flag() {
.with_config(|config| assert!(!config.chain.enable_lock_timeouts));
}
#[test]
fn shuffling_cache_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.chain.shuffling_cache_size,
beacon_node::beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
)
});
}
#[test]
fn shuffling_cache_set() {
CommandLineTest::new()
.flag("shuffling-cache-size", Some("500"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.chain.shuffling_cache_size, 500));
}
#[test]
fn fork_choice_before_proposal_timeout_default() {
CommandLineTest::new()