diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index f94332c92..1cb3625b0 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -70,6 +70,7 @@ use state_processing::{ use std::borrow::Cow; use std::fs; use std::io::Write; +use std::time::Duration; use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp}; use tree_hash::TreeHash; use types::{ @@ -1418,6 +1419,8 @@ fn load_parent( ), BlockError, > { + let spec = &chain.spec; + // Reject any block if its parent is not known to fork choice. // // A block that is not in fork choice is either: @@ -1436,15 +1439,43 @@ fn load_parent( return Err(BlockError::ParentUnknown(Box::new(block))); } + let block_delay = chain + .block_times_cache + .read() + .get_block_delays( + block.canonical_root(), + chain + .slot_clock + .start_of(block.slot()) + .unwrap_or_else(|| Duration::from_secs(0)), + ) + .observed; + let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ); - let result = if let Some(snapshot) = chain + let result = if let Some((snapshot, cloned)) = chain .snapshot_cache .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .and_then(|mut snapshot_cache| { - snapshot_cache.get_state_for_block_processing(block.parent_root()) + snapshot_cache.get_state_for_block_processing( + block.parent_root(), + block.slot(), + block_delay, + spec, + ) }) { - Ok((snapshot.into_pre_state(), block)) + if cloned { + metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_CLONES); + debug!( + chain.log, + "Cloned snapshot for late block/skipped slot"; + "slot" => %block.slot(), + "parent_slot" => %snapshot.beacon_block.slot(), + "parent_root" => ?block.parent_root(), + "block_delay" => ?block_delay, + ); + } + Ok((snapshot, block)) } else { // Load the blocks parent block from the database, returning invalid if that block is not // found. @@ -1474,6 +1505,16 @@ fn load_parent( BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root)) })?; + metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES); + debug!( + chain.log, + "Missed snapshot cache"; + "slot" => block.slot(), + "parent_slot" => parent_block.slot(), + "parent_root" => ?block.parent_root(), + "block_delay" => ?block_delay, + ); + Ok(( PreProcessingSnapshot { beacon_block: parent_block, diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 44b267647..32ebe7092 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -18,6 +18,14 @@ lazy_static! { "beacon_block_processing_successes_total", "Count of blocks processed without error" ); + pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES: Result = try_create_int_counter( + "beacon_block_processing_snapshot_cache_misses", + "Count of snapshot cache misses" + ); + pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_CLONES: Result = try_create_int_counter( + "beacon_block_processing_snapshot_cache_clones", + "Count of snapshot cache clones" + ); pub static ref BLOCK_PROCESSING_TIMES: Result = try_create_histogram("beacon_block_processing_seconds", "Full runtime of block processing"); pub static ref BLOCK_PROCESSING_BLOCK_ROOT: Result = try_create_histogram( diff --git a/beacon_node/beacon_chain/src/snapshot_cache.rs b/beacon_node/beacon_chain/src/snapshot_cache.rs index e273c3521..4f7124de3 100644 --- a/beacon_node/beacon_chain/src/snapshot_cache.rs +++ b/beacon_node/beacon_chain/src/snapshot_cache.rs @@ -1,12 +1,18 @@ use crate::BeaconSnapshot; use std::cmp; +use std::time::Duration; use types::{ - beacon_state::CloneConfig, BeaconState, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, + beacon_state::CloneConfig, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, + Slot, }; /// The default size of the cache. pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4; +/// The minimum block delay to clone the state in the cache instead of removing it. +/// This helps keep block processing fast during re-orgs from late blocks. +const MINIMUM_BLOCK_DELAY_FOR_CLONE: Duration = Duration::from_secs(6); + /// This snapshot is to be used for verifying a child of `self.beacon_block`. #[derive(Debug)] pub struct PreProcessingSnapshot { @@ -62,6 +68,22 @@ impl CacheItem { beacon_state_root, } } + + pub fn clone_as_pre_state(&self) -> PreProcessingSnapshot { + // Do not include the beacon state root if the state has been advanced. + let beacon_state_root = + Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none()); + + PreProcessingSnapshot { + beacon_block: self.beacon_block.clone(), + beacon_block_root: self.beacon_block_root, + pre_state: self + .pre_state + .as_ref() + .map_or_else(|| self.beacon_state.clone(), |pre_state| pre_state.clone()), + beacon_state_root, + } + } } /// The information required for block production. @@ -178,11 +200,36 @@ impl SnapshotCache { /// If available, returns a `CacheItem` that should be used for importing/processing a block. /// The method will remove the block from `self`, carrying across any caches that may or may not /// be built. - pub fn get_state_for_block_processing(&mut self, block_root: Hash256) -> Option> { + /// + /// In the event the block being processed was observed late, clone the cache instead of + /// moving it. This allows us to process the next block quickly in the case of a re-org. + /// Additionally, if the slot was skipped, clone the cache. This ensures blocks that are + /// later than 1 slot still have access to the cache and can be processed quickly. + pub fn get_state_for_block_processing( + &mut self, + block_root: Hash256, + block_slot: Slot, + block_delay: Option, + spec: &ChainSpec, + ) -> Option<(PreProcessingSnapshot, bool)> { self.snapshots .iter() .position(|snapshot| snapshot.beacon_block_root == block_root) - .map(|i| self.snapshots.remove(i)) + .map(|i| { + if let Some(cache) = self.snapshots.get(i) { + if block_slot > cache.beacon_block.slot() + 1 { + return (cache.clone_as_pre_state(), true); + } + if let Some(delay) = block_delay { + if delay >= MINIMUM_BLOCK_DELAY_FOR_CLONE + && delay <= Duration::from_secs(spec.seconds_per_slot) * 4 + { + return (cache.clone_as_pre_state(), true); + } + } + } + (self.snapshots.remove(i).into_pre_state(), false) + }) } /// If available, obtains a clone of a `BeaconState` that should be used for block production. @@ -320,6 +367,7 @@ mod test { #[test] fn insert_get_prune_update() { + let spec = MainnetEthSpec::default_spec(); let mut cache = SnapshotCache::new(CACHE_SIZE, get_snapshot(0)); // Insert a bunch of entries in the cache. It should look like this: @@ -359,7 +407,12 @@ mod test { assert!( cache - .get_state_for_block_processing(Hash256::from_low_u64_be(1)) + .get_state_for_block_processing( + Hash256::from_low_u64_be(1), + Slot::new(0), + None, + &spec + ) .is_none(), "the snapshot with the lowest slot should have been removed during the insert function" ); @@ -377,8 +430,14 @@ mod test { ); assert_eq!( cache - .get_state_for_block_processing(Hash256::from_low_u64_be(0)) + .get_state_for_block_processing( + Hash256::from_low_u64_be(0), + Slot::new(0), + None, + &spec + ) .expect("the head should still be in the cache") + .0 .beacon_block_root, Hash256::from_low_u64_be(0), "get_state_for_block_processing should get the correct snapshot" @@ -409,8 +468,14 @@ mod test { // Ensure that the new head value was not removed from the cache. assert_eq!( cache - .get_state_for_block_processing(Hash256::from_low_u64_be(2)) + .get_state_for_block_processing( + Hash256::from_low_u64_be(2), + Slot::new(0), + None, + &spec + ) .expect("the new head should still be in the cache") + .0 .beacon_block_root, Hash256::from_low_u64_be(2), "get_state_for_block_processing should get the correct snapshot"