Remove grandparents from snapshot cache (#2917)

## Issue Addressed

NA

## Proposed Changes

In https://github.com/sigp/lighthouse/pull/2832 we made some changes to the `SnapshotCache` to help deal with the one-block reorgs seen on mainnet (and testnets).

I believe the change in #2832 is good and we should keep it, but I think that in its present form it is causing the `SnapshotCache` to hold onto states that it doesn't need anymore. For example, a skip slot will result in one more `BeaconSnapshot` being stored in the cache.

This PR adds a new type of pruning that happens after a block is inserted to the cache. We will remove any snapshot from the cache that is a *grandparent* of the block being imported. Since we know the grandparent has two valid blocks built atop it, it is not at risk from a one-block re-org. 

## Additional Info

NA
This commit is contained in:
Paul Hauner 2022-01-14 07:20:55 +00:00
parent ceeab02e3a
commit c11253a82f
3 changed files with 65 additions and 4 deletions

View File

@ -2796,6 +2796,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block_root: block_root, beacon_block_root: block_root,
}, },
None, None,
&self.spec,
) )
}) })
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
@ -3740,6 +3741,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| { .map(|mut snapshot_cache| {
snapshot_cache.prune(new_finalized_checkpoint.epoch); snapshot_cache.prune(new_finalized_checkpoint.epoch);
debug!(
self.log,
"Snapshot cache pruned";
"new_len" => snapshot_cache.len(),
"remaining_roots" => ?snapshot_cache.beacon_block_roots(),
);
}) })
.unwrap_or_else(|| { .unwrap_or_else(|| {
error!( error!(

View File

@ -4,8 +4,12 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use lazy_static::lazy_static; use lazy_static::lazy_static;
pub use lighthouse_metrics::*; pub use lighthouse_metrics::*;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::time::Duration;
use types::{BeaconState, Epoch, EthSpec, Hash256, Slot}; use types::{BeaconState, Epoch, EthSpec, Hash256, Slot};
/// The maximum time to wait for the snapshot cache lock during a metrics scrape.
const SNAPSHOT_CACHE_TIMEOUT: Duration = Duration::from_millis(100);
lazy_static! { lazy_static! {
/* /*
* Block Processing * Block Processing
@ -18,6 +22,10 @@ lazy_static! {
"beacon_block_processing_successes_total", "beacon_block_processing_successes_total",
"Count of blocks processed without error" "Count of blocks processed without error"
); );
pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE: Result<IntGauge> = try_create_int_gauge(
"beacon_block_processing_snapshot_cache_size",
"Count snapshots in the snapshot cache"
);
pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES: Result<IntCounter> = try_create_int_counter( pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES: Result<IntCounter> = try_create_int_counter(
"beacon_block_processing_snapshot_cache_misses", "beacon_block_processing_snapshot_cache_misses",
"Count of snapshot cache misses" "Count of snapshot cache misses"
@ -913,6 +921,16 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
let attestation_stats = beacon_chain.op_pool.attestation_stats(); let attestation_stats = beacon_chain.op_pool.attestation_stats();
if let Some(snapshot_cache) = beacon_chain
.snapshot_cache
.try_write_for(SNAPSHOT_CACHE_TIMEOUT)
{
set_gauge(
&BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE,
snapshot_cache.len() as i64,
)
}
set_gauge_by_usize( set_gauge_by_usize(
&OP_POOL_NUM_ATTESTATIONS, &OP_POOL_NUM_ATTESTATIONS,
attestation_stats.num_attestations, attestation_stats.num_attestations,

View File

@ -1,4 +1,5 @@
use crate::BeaconSnapshot; use crate::BeaconSnapshot;
use itertools::process_results;
use std::cmp; use std::cmp;
use std::time::Duration; use std::time::Duration;
use types::{ use types::{
@ -164,9 +165,25 @@ impl<T: EthSpec> SnapshotCache<T> {
} }
} }
/// The block roots of all snapshots contained in `self`.
pub fn beacon_block_roots(&self) -> Vec<Hash256> {
self.snapshots.iter().map(|s| s.beacon_block_root).collect()
}
/// The number of snapshots contained in `self`.
pub fn len(&self) -> usize {
self.snapshots.len()
}
/// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see /// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see
/// struct-level documentation for more info). /// struct-level documentation for more info).
pub fn insert(&mut self, snapshot: BeaconSnapshot<T>, pre_state: Option<BeaconState<T>>) { pub fn insert(
&mut self,
snapshot: BeaconSnapshot<T>,
pre_state: Option<BeaconState<T>>,
spec: &ChainSpec,
) {
let parent_root = snapshot.beacon_block.message().parent_root();
let item = CacheItem { let item = CacheItem {
beacon_block: snapshot.beacon_block, beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root, beacon_block_root: snapshot.beacon_block_root,
@ -174,6 +191,25 @@ impl<T: EthSpec> SnapshotCache<T> {
pre_state, pre_state,
}; };
// Remove the grandparent of the block that was just inserted.
//
// Assuming it's unlikely to see re-orgs deeper than one block, this method helps keep the
// cache small by removing any states that already have more than one descendant.
//
// Remove the grandparent first to free up room in the cache.
let grandparent_result =
process_results(item.beacon_state.rev_iter_block_roots(spec), |iter| {
iter.map(|(_slot, root)| root)
.find(|root| *root != item.beacon_block_root && *root != parent_root)
});
if let Ok(Some(grandparent_root)) = grandparent_result {
let head_block_root = self.head_block_root;
self.snapshots.retain(|snapshot| {
let root = snapshot.beacon_block_root;
root == head_block_root || root != grandparent_root
});
}
if self.snapshots.len() < self.max_len { if self.snapshots.len() < self.max_len {
self.snapshots.push(item); self.snapshots.push(item);
} else { } else {
@ -384,7 +420,7 @@ mod test {
*snapshot.beacon_state.slot_mut() = *snapshot.beacon_state.slot_mut() =
Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1); Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1);
cache.insert(snapshot, None); cache.insert(snapshot, None, &spec);
assert_eq!( assert_eq!(
cache.snapshots.len(), cache.snapshots.len(),
@ -402,7 +438,7 @@ mod test {
// 2 2 // 2 2
// 3 3 // 3 3
assert_eq!(cache.snapshots.len(), CACHE_SIZE); assert_eq!(cache.snapshots.len(), CACHE_SIZE);
cache.insert(get_snapshot(42), None); cache.insert(get_snapshot(42), None, &spec);
assert_eq!(cache.snapshots.len(), CACHE_SIZE); assert_eq!(cache.snapshots.len(), CACHE_SIZE);
assert!( assert!(
@ -462,7 +498,7 @@ mod test {
// Over-fill the cache so it needs to eject some old values on insert. // Over-fill the cache so it needs to eject some old values on insert.
for i in 0..CACHE_SIZE as u64 { for i in 0..CACHE_SIZE as u64 {
cache.insert(get_snapshot(u64::max_value() - i), None); cache.insert(get_snapshot(u64::max_value() - i), None, &spec);
} }
// Ensure that the new head value was not removed from the cache. // Ensure that the new head value was not removed from the cache.