diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4f7d180ea..de7cc7e92 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1,4 +1,5 @@ use crate::checkpoint::CheckPoint; +use crate::checkpoint_cache::CheckPointCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; @@ -93,6 +94,13 @@ pub enum AttestationProcessingOutcome { Invalid(AttestationValidationError), } +pub struct HeadInfo { + pub slot: Slot, + pub block_root: Hash256, + pub state_root: Hash256, + pub finalized_checkpoint: types::Checkpoint, +} + pub trait BeaconChainTypes: Send + Sync + 'static { type Store: store::Store; type StoreMigrator: store::Migrate; @@ -129,6 +137,8 @@ pub struct BeaconChain { pub event_handler: T::EventHandler, /// Used to track the heads of the beacon chain. pub(crate) head_tracker: HeadTracker, + /// Provides a small cache of `BeaconState` and `BeaconBlock`. + pub(crate) checkpoint_cache: CheckPointCache, /// Logging to CLI, etc. pub(crate) log: Logger, } @@ -264,11 +274,10 @@ impl BeaconChain { block_root: Hash256, ) -> Result, Error> { let block = self - .get_block(&block_root)? + .get_block_caching(&block_root)? .ok_or_else(|| Error::MissingBeaconBlock(block_root))?; let state = self - .store - .get_state(&block.state_root, Some(block.slot))? + .get_state_caching(&block.state_root, Some(block.slot))? .ok_or_else(|| Error::MissingBeaconState(block.state_root))?; let iter = BlockRootsIterator::owned(self.store.clone(), state); Ok(ReverseBlockRootIterator::new( @@ -349,6 +358,63 @@ impl BeaconChain { Ok(self.store.get(block_root)?) } + /// Returns the block at the given root, if any. + /// + /// ## Errors + /// + /// May return a database error. + fn get_block_caching( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + if let Some(block) = self.checkpoint_cache.get_block(block_root) { + Ok(Some(block)) + } else { + Ok(self.store.get(block_root)?) + } + } + + /// Returns the state at the given root, if any. + /// + /// ## Errors + /// + /// May return a database error. + fn get_state_caching( + &self, + state_root: &Hash256, + slot: Option, + ) -> Result>, Error> { + if let Some(state) = self.checkpoint_cache.get_state(state_root) { + Ok(Some(state)) + } else { + Ok(self.store.get_state(state_root, slot)?) + } + } + + /// Returns the state at the given root, if any. + /// + /// The return state does not contain any caches other than the committee caches. This method + /// is much faster than `Self::get_state_caching` because it does not clone the tree hash cache + /// when the state is found in the checkpoint cache. + /// + /// ## Errors + /// + /// May return a database error. + fn get_state_caching_only_with_committee_caches( + &self, + state_root: &Hash256, + slot: Option, + ) -> Result>, Error> { + if let Some(state) = self + .checkpoint_cache + .get_state_only_with_committee_cache(state_root) + { + Ok(Some(state)) + } else { + Ok(self.store.get_state(state_root, slot)?) + } + } + /// Returns a `Checkpoint` representing the head block and state. Contains the "best block"; /// the head of the canonical `BeaconChain`. /// @@ -359,6 +425,20 @@ impl BeaconChain { self.canonical_head.read().clone() } + /// Returns info representing the head block and state. + /// + /// A summarized version of `Self::head` that involves less cloning. + pub fn head_info(&self) -> HeadInfo { + let head = self.canonical_head.read(); + + HeadInfo { + slot: head.beacon_block.slot, + block_root: head.beacon_block_root, + state_root: head.beacon_state_root, + finalized_checkpoint: head.beacon_state.finalized_checkpoint.clone(), + } + } + /// Returns the current heads of the `BeaconChain`. For the canonical head, see `Self::head`. /// /// Returns `(block_root, block_slot)`. @@ -428,8 +508,7 @@ impl BeaconChain { .ok_or_else(|| Error::NoStateForSlot(slot))?; Ok(self - .store - .get_state(&state_root, Some(slot))? + .get_state_caching(&state_root, Some(slot))? .ok_or_else(|| Error::NoStateForSlot(slot))?) } } @@ -745,50 +824,16 @@ impl BeaconChain { // // An honest validator would have set this block to be the head of the chain (i.e., the // result of running fork choice). - let result = if let Some(attestation_head_block) = self - .store - .get::>(&attestation.data.beacon_block_root)? + let result = if let Some(attestation_head_block) = + self.get_block_caching(&attestation.data.beacon_block_root)? { - // Attempt to process the attestation using the `self.head()` state. - // - // This is purely an effort to avoid loading a `BeaconState` unnecessarily from the DB. - let state = &self.head().beacon_state; - - // If it turns out that the attestation was made using the head state, then there - // is no need to load a state from the database to process the attestation. - // - // Note: use the epoch of the target because it indicates which epoch the - // attestation was created in. You cannot use the epoch of the head block, because - // the block doesn't necessarily need to be in the same epoch as the attestation - // (e.g., if there are skip slots between the epoch the block was created in and - // the epoch for the attestation). - // - // This check also ensures that the slot for `data.beacon_block_root` is not higher - // than `state.root` by ensuring that the block is in the history of `state`. - if state.current_epoch() == attestation.data.target.epoch - && (attestation.data.beacon_block_root == self.head().beacon_block_root - || state - .get_block_root(attestation_head_block.slot) - .map(|root| *root == attestation.data.beacon_block_root) - .unwrap_or_else(|_| false)) - { - // The head state is able to be used to validate this attestation. No need to load - // anything from the database. - return self.process_attestation_for_state_and_block( - attestation.clone(), - state, - &attestation_head_block, - ); - } - // Use the `data.beacon_block_root` to load the state from the latest non-skipped // slot preceding the attestation's creation. // // This state is guaranteed to be in the same chain as the attestation, but it's // not guaranteed to be from the same slot or epoch as the attestation. let mut state: BeaconState = self - .store - .get_state( + .get_state_caching_only_with_committee_caches( &attestation_head_block.state_root, Some(attestation_head_block.slot), )? @@ -837,11 +882,7 @@ impl BeaconChain { // // This is likely overly restrictive, we could store the attestation for later // processing. - let head_epoch = self - .head() - .beacon_block - .slot - .epoch(T::EthSpec::slots_per_epoch()); + let head_epoch = self.head_info().slot.epoch(T::EthSpec::slots_per_epoch()); let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()); // Only log a warning if our head is in a reasonable place to verify this attestation. @@ -903,7 +944,7 @@ impl BeaconChain { // - The highest valid finalized epoch we've ever seen (i.e., the head). // - The finalized epoch that this attestation was created against. let finalized_epoch = std::cmp::max( - self.head().beacon_state.finalized_checkpoint.epoch, + self.head_info().finalized_checkpoint.epoch, state.finalized_checkpoint.epoch, ); @@ -1110,8 +1151,7 @@ impl BeaconChain { let full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); let finalized_slot = self - .head() - .beacon_state + .head_info() .finalized_checkpoint .epoch .start_slot(T::EthSpec::slots_per_epoch()); @@ -1156,21 +1196,21 @@ impl BeaconChain { // Load the blocks parent block from the database, returning invalid if that block is not // found. - let parent_block: BeaconBlock = match self.store.get(&block.parent_root)? { - Some(block) => block, - None => { - return Ok(BlockProcessingOutcome::ParentUnknown { - parent: block.parent_root, - }); - } - }; + let parent_block: BeaconBlock = + match self.get_block_caching(&block.parent_root)? { + Some(block) => block, + None => { + return Ok(BlockProcessingOutcome::ParentUnknown { + parent: block.parent_root, + }); + } + }; // Load the parent blocks state from the database, returning an error if it is not found. // It is an error because if we know the parent block we should also know the parent state. let parent_state_root = parent_block.state_root; let parent_state = self - .store - .get_state(&parent_state_root, Some(parent_block.slot))? + .get_state_caching(&parent_state_root, Some(parent_block.slot))? .ok_or_else(|| { Error::DBInconsistent(format!("Missing state {:?}", parent_state_root)) })?; @@ -1187,7 +1227,8 @@ impl BeaconChain { // Transition the parent state to the block slot. let mut state: BeaconState = parent_state; - for i in state.slot.as_u64()..block.slot.as_u64() { + let distance = block.slot.as_u64().saturating_sub(state.slot.as_u64()); + for i in 0..distance { if i > 0 { intermediate_states.push(state.clone()); } @@ -1231,7 +1272,9 @@ impl BeaconChain { let state_root_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_STATE_ROOT); - let state_root = state.canonical_root(); + let state_root = state.update_tree_hash_cache()?; + + metrics::stop_timer(state_root_timer); write_state( &format!("state_post_block_{}", block_root), @@ -1246,8 +1289,6 @@ impl BeaconChain { }); } - metrics::stop_timer(state_root_timer); - let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); // Store all the states between the parent block state and this blocks slot before storing @@ -1315,6 +1356,18 @@ impl BeaconChain { &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, block.body.attestations.len() as f64, ); + + // Store the block in the checkpoint cache. + // + // A block that was just imported is likely to be referenced by the next block that we + // import. + self.checkpoint_cache.insert(&CheckPoint { + beacon_block_root: block_root, + beacon_block: block, + beacon_state_root: state_root, + beacon_state: state, + }); + metrics::stop_timer(full_timer); Ok(BlockProcessingOutcome::Processed { block_root }) @@ -1410,7 +1463,7 @@ impl BeaconChain { &self.spec, )?; - let state_root = state.canonical_root(); + let state_root = state.update_tree_hash_cache()?; block.state_root = state_root; @@ -1439,24 +1492,22 @@ impl BeaconChain { let beacon_block_root = self.fork_choice.find_head(&self)?; // If a new head was chosen. - let result = if beacon_block_root != self.head().beacon_block_root { + let result = if beacon_block_root != self.head_info().block_root { metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD); let beacon_block: BeaconBlock = self - .store - .get(&beacon_block_root)? + .get_block_caching(&beacon_block_root)? .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; let beacon_state_root = beacon_block.state_root; let beacon_state: BeaconState = self - .store - .get_state(&beacon_state_root, Some(beacon_block.slot))? + .get_state_caching(&beacon_state_root, Some(beacon_block.slot))? .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; - let previous_slot = self.head().beacon_block.slot; + let previous_slot = self.head_info().slot; let new_slot = beacon_block.slot; - let is_reorg = self.head().beacon_block_root != beacon_block.parent_root; + let is_reorg = self.head_info().block_root != beacon_block.parent_root; // If we switched to a new chain (instead of building atop the present chain). if is_reorg { @@ -1464,7 +1515,7 @@ impl BeaconChain { warn!( self.log, "Beacon chain re-org"; - "previous_head" => format!("{}", self.head().beacon_block_root), + "previous_head" => format!("{}", self.head_info().block_root), "previous_slot" => previous_slot, "new_head_parent" => format!("{}", beacon_block.parent_root), "new_head" => format!("{}", beacon_block_root), @@ -1483,7 +1534,7 @@ impl BeaconChain { ); }; - let old_finalized_epoch = self.head().beacon_state.finalized_checkpoint.epoch; + let old_finalized_epoch = self.head_info().finalized_checkpoint.epoch; let new_finalized_epoch = beacon_state.finalized_checkpoint.epoch; let finalized_root = beacon_state.finalized_checkpoint.root; @@ -1508,6 +1559,11 @@ impl BeaconChain { let timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); + // Store the head in the checkpoint cache. + // + // The head block is likely to be referenced by the next imported block. + self.checkpoint_cache.insert(&new_head); + // Update the checkpoint that stores the head of the chain at the time it received the // block. *self.canonical_head.write() = new_head; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ee04b43f4..da23e53dd 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,3 +1,4 @@ +use crate::checkpoint_cache::CheckPointCache; use crate::eth1_chain::CachingEth1Backend; use crate::events::NullEventHandler; use crate::head_tracker::HeadTracker; @@ -374,6 +375,7 @@ where .event_handler .ok_or_else(|| "Cannot build without an event handler".to_string())?, head_tracker: self.head_tracker.unwrap_or_default(), + checkpoint_cache: CheckPointCache::default(), log: log.clone(), }; diff --git a/beacon_node/beacon_chain/src/checkpoint_cache.rs b/beacon_node/beacon_chain/src/checkpoint_cache.rs new file mode 100644 index 000000000..73e9746ae --- /dev/null +++ b/beacon_node/beacon_chain/src/checkpoint_cache.rs @@ -0,0 +1,124 @@ +use crate::checkpoint::CheckPoint; +use crate::metrics; +use parking_lot::RwLock; +use types::{BeaconBlock, BeaconState, EthSpec, Hash256}; + +const CACHE_SIZE: usize = 4; + +struct Inner { + oldest: usize, + limit: usize, + checkpoints: Vec>, +} + +impl Default for Inner { + fn default() -> Self { + Self { + oldest: 0, + limit: CACHE_SIZE, + checkpoints: vec![], + } + } +} + +pub struct CheckPointCache { + inner: RwLock>, +} + +impl Default for CheckPointCache { + fn default() -> Self { + Self { + inner: RwLock::new(Inner::default()), + } + } +} + +impl CheckPointCache { + pub fn insert(&self, checkpoint: &CheckPoint) { + if self + .inner + .read() + .checkpoints + .iter() + // This is `O(n)` but whilst `n == 4` it ain't no thing. + .any(|local| local.beacon_state_root == checkpoint.beacon_state_root) + { + // Adding a known checkpoint to the cache should be a no-op. + return; + } + + let mut inner = self.inner.write(); + + if inner.checkpoints.len() < inner.limit { + inner.checkpoints.push(checkpoint.clone()) + } else { + let i = inner.oldest; // to satisfy the borrow checker. + inner.checkpoints[i] = checkpoint.clone(); + inner.oldest += 1; + inner.oldest %= inner.limit; + } + } + + pub fn get_state(&self, state_root: &Hash256) -> Option> { + self.inner + .read() + .checkpoints + .iter() + // Also `O(n)`. + .find(|checkpoint| checkpoint.beacon_state_root == *state_root) + .map(|checkpoint| { + metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS); + + checkpoint.beacon_state.clone() + }) + .or_else(|| { + metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES); + + None + }) + } + + pub fn get_state_only_with_committee_cache( + &self, + state_root: &Hash256, + ) -> Option> { + self.inner + .read() + .checkpoints + .iter() + // Also `O(n)`. + .find(|checkpoint| checkpoint.beacon_state_root == *state_root) + .map(|checkpoint| { + metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS); + + let mut state = checkpoint.beacon_state.clone_without_caches(); + state.committee_caches = checkpoint.beacon_state.committee_caches.clone(); + + state + }) + .or_else(|| { + metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES); + + None + }) + } + + pub fn get_block(&self, block_root: &Hash256) -> Option> { + self.inner + .read() + .checkpoints + .iter() + // Also `O(n)`. + .find(|checkpoint| checkpoint.beacon_block_root == *block_root) + .map(|checkpoint| { + metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS); + + checkpoint.beacon_block.clone() + }) + .or_else(|| { + metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES); + + None + }) + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 1112a6dd9..d9e60fac8 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -5,6 +5,7 @@ extern crate lazy_static; mod beacon_chain; pub mod builder; mod checkpoint; +mod checkpoint_cache; mod errors; pub mod eth1_chain; pub mod events; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index bd1742b58..e5dd4af59 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -149,6 +149,14 @@ lazy_static! { pub static ref PERSIST_CHAIN: Result = try_create_histogram("beacon_persist_chain", "Time taken to update the canonical head"); + /* + * Checkpoint cache + */ + pub static ref CHECKPOINT_CACHE_HITS: Result = + try_create_int_counter("beacon_checkpoint_cache_hits_total", "Count of times checkpoint cache fulfils request"); + pub static ref CHECKPOINT_CACHE_MISSES: Result = + try_create_int_counter("beacon_checkpoint_cache_misses_total", "Count of times checkpoint cache fulfils request"); + /* * Chain Head */ diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index a2f163f5b..022bb67b0 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -2,7 +2,7 @@ use crate::*; use ssz::{Decode, DecodeError, Encode}; use ssz_derive::{Decode, Encode}; use std::convert::TryInto; -use types::beacon_state::{BeaconTreeHashCache, CommitteeCache, CACHED_EPOCHS}; +use types::beacon_state::{CommitteeCache, CACHED_EPOCHS}; pub fn store_full_state, E: EthSpec>( store: &S, @@ -47,27 +47,14 @@ pub fn get_full_state, E: EthSpec>( pub struct StorageContainer { state: BeaconState, committee_caches: Vec, - tree_hash_cache: BeaconTreeHashCache, } impl StorageContainer { /// Create a new instance for storing a `BeaconState`. pub fn new(state: &BeaconState) -> Self { - let mut state = state.clone(); - - let mut committee_caches = vec![CommitteeCache::default(); CACHED_EPOCHS]; - - for i in 0..CACHED_EPOCHS { - std::mem::swap(&mut state.committee_caches[i], &mut committee_caches[i]); - } - - let tree_hash_cache = - std::mem::replace(&mut state.tree_hash_cache, BeaconTreeHashCache::default()); - Self { - state, - committee_caches, - tree_hash_cache, + state: state.clone_without_caches(), + committee_caches: state.committee_caches.to_vec(), } } } @@ -88,8 +75,6 @@ impl TryInto> for StorageContainer { state.committee_caches[i] = self.committee_caches.remove(i); } - state.tree_hash_cache = self.tree_hash_cache; - Ok(state) } } diff --git a/eth2/types/benches/benches.rs b/eth2/types/benches/benches.rs index 10d9df7ec..dd66c7174 100644 --- a/eth2/types/benches/benches.rs +++ b/eth2/types/benches/benches.rs @@ -3,7 +3,7 @@ use criterion::{black_box, criterion_group, criterion_main, Benchmark}; use rayon::prelude::*; use ssz::{Decode, Encode}; use types::{ - test_utils::generate_deterministic_keypair, BeaconState, Eth1Data, EthSpec, Hash256, + test_utils::generate_deterministic_keypair, BeaconState, Epoch, Eth1Data, EthSpec, Hash256, MainnetEthSpec, Validator, }; @@ -28,12 +28,12 @@ fn get_state(validator_count: usize) -> BeaconState { .map(|&i| Validator { pubkey: generate_deterministic_keypair(i).pk.into(), withdrawal_credentials: Hash256::from_low_u64_le(i as u64), - effective_balance: i as u64, - slashed: i % 2 == 0, - activation_eligibility_epoch: i.into(), - activation_epoch: i.into(), - exit_epoch: i.into(), - withdrawable_epoch: i.into(), + effective_balance: spec.max_effective_balance, + slashed: false, + activation_eligibility_epoch: Epoch::new(0), + activation_epoch: Epoch::new(0), + exit_epoch: Epoch::from(u64::max_value()), + withdrawable_epoch: Epoch::from(u64::max_value()), }) .collect::>() .into(); @@ -43,14 +43,18 @@ fn get_state(validator_count: usize) -> BeaconState { fn all_benches(c: &mut Criterion) { let validator_count = 16_384; - let state = get_state::(validator_count); + let spec = &MainnetEthSpec::default_spec(); + + let mut state = get_state::(validator_count); + state.build_all_caches(spec).expect("should build caches"); let state_bytes = state.as_ssz_bytes(); + let inner_state = state.clone(); c.bench( &format!("{}_validators", validator_count), Benchmark::new("encode/beacon_state", move |b| { b.iter_batched_ref( - || state.clone(), + || inner_state.clone(), |state| black_box(state.as_ssz_bytes()), criterion::BatchSize::SmallInput, ) @@ -73,6 +77,32 @@ fn all_benches(c: &mut Criterion) { }) .sample_size(10), ); + + let inner_state = state.clone(); + c.bench( + &format!("{}_validators", validator_count), + Benchmark::new("clone/beacon_state", move |b| { + b.iter_batched_ref( + || inner_state.clone(), + |state| black_box(state.clone()), + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); + + let inner_state = state.clone(); + c.bench( + &format!("{}_validators", validator_count), + Benchmark::new("clone_without_caches/beacon_state", move |b| { + b.iter_batched_ref( + || inner_state.clone(), + |state| black_box(state.clone_without_caches()), + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); } criterion_group!(benches, all_benches,); diff --git a/eth2/types/src/beacon_state.rs b/eth2/types/src/beacon_state.rs index 460514464..4e31d9266 100644 --- a/eth2/types/src/beacon_state.rs +++ b/eth2/types/src/beacon_state.rs @@ -909,6 +909,39 @@ impl BeaconState { pub fn drop_tree_hash_cache(&mut self) { self.tree_hash_cache = BeaconTreeHashCache::default(); } + + pub fn clone_without_caches(&self) -> Self { + BeaconState { + genesis_time: self.genesis_time, + slot: self.slot, + fork: self.fork.clone(), + latest_block_header: self.latest_block_header.clone(), + block_roots: self.block_roots.clone(), + state_roots: self.state_roots.clone(), + historical_roots: self.historical_roots.clone(), + eth1_data: self.eth1_data.clone(), + eth1_data_votes: self.eth1_data_votes.clone(), + eth1_deposit_index: self.eth1_deposit_index, + validators: self.validators.clone(), + balances: self.balances.clone(), + randao_mixes: self.randao_mixes.clone(), + slashings: self.slashings.clone(), + previous_epoch_attestations: self.previous_epoch_attestations.clone(), + current_epoch_attestations: self.current_epoch_attestations.clone(), + justification_bits: self.justification_bits.clone(), + previous_justified_checkpoint: self.previous_justified_checkpoint.clone(), + current_justified_checkpoint: self.current_justified_checkpoint.clone(), + finalized_checkpoint: self.finalized_checkpoint.clone(), + committee_caches: [ + CommitteeCache::default(), + CommitteeCache::default(), + CommitteeCache::default(), + ], + pubkey_cache: PubkeyCache::default(), + exit_cache: ExitCache::default(), + tree_hash_cache: BeaconTreeHashCache::default(), + } + } } impl From for Error {