lighthouse/beacon_node/beacon_chain/src/snapshot_cache.rs
Michael Sproul 775d222299 Enable proposer boost re-orging (#2860)
## Proposed Changes

With proposer boosting implemented (#2822) we have an opportunity to re-org out late blocks.

This PR adds three flags to the BN to control this behaviour:

* `--disable-proposer-reorgs`: turn aggressive re-orging off (it's on by default).
* `--proposer-reorg-threshold N`: attempt to orphan blocks with less than N% of the committee vote. If this parameter isn't set then N defaults to 20% when the feature is enabled.
* `--proposer-reorg-epochs-since-finalization N`: only attempt to re-org late blocks when the number of epochs since finalization is less than or equal to N. The default is 2 epochs, meaning re-orgs will only be attempted when the chain is finalizing optimally.

For safety Lighthouse will only attempt a re-org under very specific conditions:

1. The block being proposed is 1 slot after the canonical head, and the canonical head is 1 slot after its parent. i.e. at slot `n + 1` rather than building on the block from slot `n` we build on the block from slot `n - 1`.
2. The current canonical head received less than N% of the committee vote. N should be set depending on the proposer boost fraction itself, the fraction of the network that is believed to be applying it, and the size of the largest entity that could be hoarding votes.
3. The current canonical head arrived after the attestation deadline from our perspective. This condition was only added to support suppression of forkchoiceUpdated messages, but makes intuitive sense.
4. The block is being proposed in the first 2 seconds of the slot. This gives it time to propagate and receive the proposer boost.


## Additional Info

For the initial idea and background, see: https://github.com/ethereum/consensus-specs/pull/2353#issuecomment-950238004

There is also a specification for this feature here: https://github.com/ethereum/consensus-specs/pull/3034

Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
2022-12-13 09:57:26 +00:00

524 lines
19 KiB
Rust

use crate::BeaconSnapshot;
use itertools::process_results;
use std::cmp;
use std::sync::Arc;
use std::time::Duration;
use types::{
beacon_state::CloneConfig, BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256,
SignedBeaconBlock, Slot,
};
/// The default size of the cache.
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
/// The minimum block delay to clone the state in the cache instead of removing it.
/// This helps keep block processing fast during re-orgs from late blocks.
fn minimum_block_delay_for_clone(seconds_per_slot: u64) -> Duration {
// If the block arrived at the attestation deadline or later, it might get re-orged.
Duration::from_secs(seconds_per_slot) / 3
}
/// This snapshot is to be used for verifying a child of `self.beacon_block`.
#[derive(Debug)]
pub struct PreProcessingSnapshot<T: EthSpec> {
/// This state is equivalent to the `self.beacon_block.state_root()` state that has been
/// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for
/// the application of another block.
pub pre_state: BeaconState<T>,
/// This value is only set to `Some` if the `pre_state` was *not* advanced forward.
pub beacon_state_root: Option<Hash256>,
pub beacon_block: SignedBeaconBlock<T, BlindedPayload<T>>,
pub beacon_block_root: Hash256,
}
impl<T: EthSpec> From<BeaconSnapshot<T>> for PreProcessingSnapshot<T> {
fn from(snapshot: BeaconSnapshot<T>) -> Self {
let beacon_state_root = Some(snapshot.beacon_state_root());
Self {
pre_state: snapshot.beacon_state,
beacon_state_root,
beacon_block: snapshot.beacon_block.clone_as_blinded(),
beacon_block_root: snapshot.beacon_block_root,
}
}
}
impl<T: EthSpec> CacheItem<T> {
pub fn new_without_pre_state(snapshot: BeaconSnapshot<T>) -> Self {
Self {
beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root,
beacon_state: snapshot.beacon_state,
pre_state: None,
}
}
fn clone_to_snapshot_with(&self, clone_config: CloneConfig) -> BeaconSnapshot<T> {
BeaconSnapshot {
beacon_state: self.beacon_state.clone_with(clone_config),
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
}
}
pub fn into_pre_state(self) -> PreProcessingSnapshot<T> {
// Do not include the beacon state root if the state has been advanced.
let beacon_state_root =
Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none());
PreProcessingSnapshot {
beacon_block: self.beacon_block.clone_as_blinded(),
beacon_block_root: self.beacon_block_root,
pre_state: self.pre_state.unwrap_or(self.beacon_state),
beacon_state_root,
}
}
pub fn clone_as_pre_state(&self) -> PreProcessingSnapshot<T> {
// Do not include the beacon state root if the state has been advanced.
let beacon_state_root =
Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none());
PreProcessingSnapshot {
beacon_block: self.beacon_block.clone_as_blinded(),
beacon_block_root: self.beacon_block_root,
pre_state: self
.pre_state
.as_ref()
.map_or_else(|| self.beacon_state.clone(), |pre_state| pre_state.clone()),
beacon_state_root,
}
}
}
/// The information required for block production.
pub struct BlockProductionPreState<T: EthSpec> {
/// This state may or may not have been advanced forward a single slot.
///
/// See the documentation in the `crate::state_advance_timer` module for more information.
pub pre_state: BeaconState<T>,
/// This value will only be `Some` if `self.pre_state` was **not** advanced forward a single
/// slot.
///
/// This value can be used to avoid tree-hashing the state during the first call to
/// `per_slot_processing`.
pub state_root: Option<Hash256>,
}
pub enum StateAdvance<T: EthSpec> {
/// The cache does not contain the supplied block root.
BlockNotFound,
/// The cache contains the supplied block root but the state has already been advanced.
AlreadyAdvanced,
/// The cache contains the supplied block root and the state has not yet been advanced.
State {
state: Box<BeaconState<T>>,
state_root: Hash256,
block_slot: Slot,
},
}
/// The item stored in the `SnapshotCache`.
pub struct CacheItem<T: EthSpec> {
beacon_block: Arc<SignedBeaconBlock<T>>,
beacon_block_root: Hash256,
/// This state is equivalent to `self.beacon_block.state_root()`.
beacon_state: BeaconState<T>,
/// This state is equivalent to `self.beacon_state` that has had `per_slot_processing` applied
/// to it. This state assists in optimizing block processing.
pre_state: Option<BeaconState<T>>,
}
impl<T: EthSpec> Into<BeaconSnapshot<T>> for CacheItem<T> {
fn into(self) -> BeaconSnapshot<T> {
BeaconSnapshot {
beacon_state: self.beacon_state,
beacon_block: self.beacon_block,
beacon_block_root: self.beacon_block_root,
}
}
}
/// Provides a cache of `BeaconSnapshot` that is intended primarily for block processing.
///
/// ## Cache Queuing
///
/// The cache has a non-standard queue mechanism (specifically, it is not LRU).
///
/// The cache has a max number of elements (`max_len`). Until `max_len` is achieved, all snapshots
/// are simply added to the queue. Once `max_len` is achieved, adding a new snapshot will cause an
/// existing snapshot to be ejected. The ejected snapshot will:
///
/// - Never be the `head_block_root`.
/// - Be the snapshot with the lowest `state.slot` (ties broken arbitrarily).
pub struct SnapshotCache<T: EthSpec> {
max_len: usize,
head_block_root: Hash256,
snapshots: Vec<CacheItem<T>>,
}
impl<T: EthSpec> SnapshotCache<T> {
/// Instantiate a new cache which contains the `head` snapshot.
///
/// Setting `max_len = 0` is equivalent to setting `max_len = 1`.
pub fn new(max_len: usize, head: BeaconSnapshot<T>) -> Self {
Self {
max_len: cmp::max(max_len, 1),
head_block_root: head.beacon_block_root,
snapshots: vec![CacheItem::new_without_pre_state(head)],
}
}
/// The block roots of all snapshots contained in `self`.
pub fn beacon_block_roots(&self) -> Vec<Hash256> {
self.snapshots.iter().map(|s| s.beacon_block_root).collect()
}
/// The number of snapshots contained in `self`.
pub fn len(&self) -> usize {
self.snapshots.len()
}
/// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see
/// struct-level documentation for more info).
pub fn insert(
&mut self,
snapshot: BeaconSnapshot<T>,
pre_state: Option<BeaconState<T>>,
spec: &ChainSpec,
) {
let parent_root = snapshot.beacon_block.message().parent_root();
let item = CacheItem {
beacon_block: snapshot.beacon_block.clone(),
beacon_block_root: snapshot.beacon_block_root,
beacon_state: snapshot.beacon_state,
pre_state,
};
// Remove the grandparent of the block that was just inserted.
//
// Assuming it's unlikely to see re-orgs deeper than one block, this method helps keep the
// cache small by removing any states that already have more than one descendant.
//
// Remove the grandparent first to free up room in the cache.
let grandparent_result =
process_results(item.beacon_state.rev_iter_block_roots(spec), |iter| {
iter.map(|(_slot, root)| root)
.find(|root| *root != item.beacon_block_root && *root != parent_root)
});
if let Ok(Some(grandparent_root)) = grandparent_result {
let head_block_root = self.head_block_root;
self.snapshots.retain(|snapshot| {
let root = snapshot.beacon_block_root;
root == head_block_root || root != grandparent_root
});
}
if self.snapshots.len() < self.max_len {
self.snapshots.push(item);
} else {
let insert_at = self
.snapshots
.iter()
.enumerate()
.filter_map(|(i, snapshot)| {
if snapshot.beacon_block_root != self.head_block_root {
Some((i, snapshot.beacon_state.slot()))
} else {
None
}
})
.min_by_key(|(_i, slot)| *slot)
.map(|(i, _slot)| i);
if let Some(i) = insert_at {
self.snapshots[i] = item;
}
}
}
/// If available, returns a `CacheItem` that should be used for importing/processing a block.
/// The method will remove the block from `self`, carrying across any caches that may or may not
/// be built.
///
/// In the event the block being processed was observed late, clone the cache instead of
/// moving it. This allows us to process the next block quickly in the case of a re-org.
/// Additionally, if the slot was skipped, clone the cache. This ensures blocks that are
/// later than 1 slot still have access to the cache and can be processed quickly.
pub fn get_state_for_block_processing(
&mut self,
block_root: Hash256,
block_slot: Slot,
block_delay: Option<Duration>,
spec: &ChainSpec,
) -> Option<(PreProcessingSnapshot<T>, bool)> {
self.snapshots
.iter()
.position(|snapshot| snapshot.beacon_block_root == block_root)
.map(|i| {
if let Some(cache) = self.snapshots.get(i) {
// Avoid cloning the block during sync (when the `block_delay` is `None`).
if let Some(delay) = block_delay {
if delay >= minimum_block_delay_for_clone(spec.seconds_per_slot)
&& delay <= Duration::from_secs(spec.seconds_per_slot) * 4
|| block_slot > cache.beacon_block.slot() + 1
{
return (cache.clone_as_pre_state(), true);
}
}
}
(self.snapshots.remove(i).into_pre_state(), false)
})
}
/// If available, obtains a clone of a `BeaconState` that should be used for block production.
/// The clone will use `CloneConfig:all()`, ensuring any tree-hash cache is cloned too.
///
/// ## Note
///
/// This method clones the `BeaconState` (instead of removing it) since we assume that any block
/// we produce will soon be pushed to the `BeaconChain` for importing/processing. Keeping a copy
/// of that `BeaconState` in `self` will greatly help with import times.
pub fn get_state_for_block_production(
&self,
block_root: Hash256,
) -> Option<BlockProductionPreState<T>> {
self.snapshots
.iter()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| {
if let Some(pre_state) = &snapshot.pre_state {
BlockProductionPreState {
pre_state: pre_state.clone_with(CloneConfig::all()),
state_root: None,
}
} else {
BlockProductionPreState {
pre_state: snapshot.beacon_state.clone_with(CloneConfig::all()),
state_root: Some(snapshot.beacon_block.state_root()),
}
}
})
}
/// If there is a snapshot with `block_root`, clone it and return the clone.
pub fn get_cloned(
&self,
block_root: Hash256,
clone_config: CloneConfig,
) -> Option<BeaconSnapshot<T>> {
self.snapshots
.iter()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| snapshot.clone_to_snapshot_with(clone_config))
}
pub fn get_for_state_advance(&mut self, block_root: Hash256) -> StateAdvance<T> {
if let Some(snapshot) = self
.snapshots
.iter_mut()
.find(|snapshot| snapshot.beacon_block_root == block_root)
{
if snapshot.pre_state.is_some() {
StateAdvance::AlreadyAdvanced
} else {
let cloned = snapshot
.beacon_state
.clone_with(CloneConfig::committee_caches_only());
StateAdvance::State {
state: Box::new(std::mem::replace(&mut snapshot.beacon_state, cloned)),
state_root: snapshot.beacon_block.state_root(),
block_slot: snapshot.beacon_block.slot(),
}
}
} else {
StateAdvance::BlockNotFound
}
}
pub fn update_pre_state(&mut self, block_root: Hash256, state: BeaconState<T>) -> Option<()> {
self.snapshots
.iter_mut()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| {
snapshot.pre_state = Some(state);
})
}
/// Removes all snapshots from the queue that are less than or equal to the finalized epoch.
pub fn prune(&mut self, finalized_epoch: Epoch) {
self.snapshots.retain(|snapshot| {
snapshot.beacon_state.slot() > finalized_epoch.start_slot(T::slots_per_epoch())
})
}
/// Inform the cache that the head of the beacon chain has changed.
///
/// The snapshot that matches this `head_block_root` will never be ejected from the cache
/// during `Self::insert`.
pub fn update_head(&mut self, head_block_root: Hash256) {
self.head_block_root = head_block_root
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use types::{
test_utils::generate_deterministic_keypair, BeaconBlock, Epoch, MainnetEthSpec,
SignedBeaconBlock, Slot,
};
fn get_harness() -> BeaconChainHarness<EphemeralHarnessType<MainnetEthSpec>> {
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.default_spec()
.deterministic_keypairs(1)
.fresh_ephemeral_store()
.build();
harness.advance_slot();
harness
}
const CACHE_SIZE: usize = 4;
fn get_snapshot(i: u64) -> BeaconSnapshot<MainnetEthSpec> {
let spec = MainnetEthSpec::default_spec();
let beacon_state = get_harness().chain.head_beacon_state_cloned();
let signed_beacon_block = SignedBeaconBlock::from_block(
BeaconBlock::empty(&spec),
generate_deterministic_keypair(0)
.sk
.sign(Hash256::from_low_u64_be(42)),
);
BeaconSnapshot {
beacon_state,
beacon_block: Arc::new(signed_beacon_block),
beacon_block_root: Hash256::from_low_u64_be(i),
}
}
#[test]
fn insert_get_prune_update() {
let spec = MainnetEthSpec::default_spec();
let mut cache = SnapshotCache::new(CACHE_SIZE, get_snapshot(0));
// Insert a bunch of entries in the cache. It should look like this:
//
// Index Root
// 0 0 <--head
// 1 1
// 2 2
// 3 3
for i in 1..CACHE_SIZE as u64 {
let mut snapshot = get_snapshot(i);
// Each snapshot should be one slot into an epoch, with each snapshot one epoch apart.
*snapshot.beacon_state.slot_mut() =
Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1);
cache.insert(snapshot, None, &spec);
assert_eq!(
cache.snapshots.len(),
i as usize + 1,
"cache length should be as expected"
);
assert_eq!(cache.head_block_root, Hash256::from_low_u64_be(0));
}
// Insert a new value in the cache. Afterwards it should look like:
//
// Index Root
// 0 0 <--head
// 1 42
// 2 2
// 3 3
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
cache.insert(get_snapshot(42), None, &spec);
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
assert!(
cache
.get_state_for_block_processing(
Hash256::from_low_u64_be(1),
Slot::new(0),
None,
&spec
)
.is_none(),
"the snapshot with the lowest slot should have been removed during the insert function"
);
assert!(cache
.get_cloned(Hash256::from_low_u64_be(1), CloneConfig::none())
.is_none());
assert_eq!(
cache
.get_cloned(Hash256::from_low_u64_be(0), CloneConfig::none())
.expect("the head should still be in the cache")
.beacon_block_root,
Hash256::from_low_u64_be(0),
"get_cloned should get the correct snapshot"
);
assert_eq!(
cache
.get_state_for_block_processing(
Hash256::from_low_u64_be(0),
Slot::new(0),
None,
&spec
)
.expect("the head should still be in the cache")
.0
.beacon_block_root,
Hash256::from_low_u64_be(0),
"get_state_for_block_processing should get the correct snapshot"
);
assert_eq!(
cache.snapshots.len(),
CACHE_SIZE - 1,
"get_state_for_block_processing should shorten the cache"
);
// Prune the cache. Afterwards it should look like:
//
// Index Root
// 0 2
// 1 3
cache.prune(Epoch::new(2));
assert_eq!(cache.snapshots.len(), 2);
cache.update_head(Hash256::from_low_u64_be(2));
// Over-fill the cache so it needs to eject some old values on insert.
for i in 0..CACHE_SIZE as u64 {
cache.insert(get_snapshot(u64::max_value() - i), None, &spec);
}
// Ensure that the new head value was not removed from the cache.
assert_eq!(
cache
.get_state_for_block_processing(
Hash256::from_low_u64_be(2),
Slot::new(0),
None,
&spec
)
.expect("the new head should still be in the cache")
.0
.beacon_block_root,
Hash256::from_low_u64_be(2),
"get_state_for_block_processing should get the correct snapshot"
);
}
}