diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3da579cfe..d71438db6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1,4 +1,3 @@ -use crate::checkpoint::CheckPoint; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; @@ -7,8 +6,10 @@ use crate::head_tracker::HeadTracker; use crate::metrics; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::ShufflingCache; +use crate::snapshot_cache::SnapshotCache; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use crate::BeaconSnapshot; use operation_pool::{OperationPool, PersistedOperationPool}; use slog::{debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; @@ -54,6 +55,9 @@ const MAXIMUM_BLOCK_SLOT_NUMBER: u64 = 4_294_967_296; // 2^32 /// head. const HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1); +/// The time-out before failure during an operation to take a read/write RwLock on the block +/// processing cache. +const BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); /// The time-out before failure during an operation to take a read/write RwLock on the /// attestation cache. const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); @@ -182,7 +186,7 @@ pub struct BeaconChain { /// Provides information from the Ethereum 1 (PoW) chain. pub eth1_chain: Option>, /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. - pub(crate) canonical_head: TimeoutRwLock>, + pub(crate) canonical_head: TimeoutRwLock>, /// The root of the genesis block. pub genesis_block_root: Hash256, /// A state-machine that is updated with information from the network and chooses a canonical @@ -192,6 +196,8 @@ pub struct BeaconChain { pub event_handler: T::EventHandler, /// Used to track the heads of the beacon chain. pub(crate) head_tracker: HeadTracker, + /// A cache dedicated to block processing. + pub(crate) block_processing_cache: TimeoutRwLock>, /// Caches the shuffling for a given epoch and state root. pub(crate) shuffling_cache: TimeoutRwLock, /// Caches a map of `validator_index -> validator_pubkey`. @@ -444,34 +450,13 @@ impl BeaconChain { Ok(self.store.get_state(state_root, slot)?) } - /// Returns the state at the given root, if any. - /// - /// The return state does not contain any caches other than the committee caches. This method - /// is much faster than `Self::get_state` because it does not clone the tree hash cache - /// when the state is found in the cache. - /// - /// ## Errors - /// - /// May return a database error. - pub fn get_state_caching_only_with_committee_caches( - &self, - state_root: &Hash256, - slot: Option, - ) -> Result>, Error> { - Ok(self.store.get_state_with( - state_root, - slot, - types::beacon_state::CloneConfig::committee_caches_only(), - )?) - } - /// Returns a `Checkpoint` representing the head block and state. Contains the "best block"; /// the head of the canonical `BeaconChain`. /// /// It is important to note that the `beacon_state` returned may not match the present slot. It /// is the state as it was when the head block was received, which could be some slots prior to /// now. - pub fn head(&self) -> Result, Error> { + pub fn head(&self) -> Result, Error> { self.canonical_head .try_read_for(HEAD_LOCK_TIMEOUT) .ok_or_else(|| Error::CanonicalHeadLockTimeout) @@ -703,7 +688,7 @@ impl BeaconChain { drop(head); let mut state = self - .get_state_caching_only_with_committee_caches(&state_root, Some(slot))? + .get_state(&state_root, Some(slot))? .ok_or_else(|| Error::MissingBeaconState(state_root))?; state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; @@ -967,10 +952,7 @@ impl BeaconChain { metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); let mut state = self - .get_state_caching_only_with_committee_caches( - &target_block_state_root, - Some(target_block_slot), - )? + .get_state(&target_block_state_root, Some(target_block_slot))? .ok_or_else(|| Error::MissingBeaconState(target_block_state_root))?; metrics::stop_timer(state_read_timer); @@ -1305,26 +1287,39 @@ impl BeaconChain { // processing. let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ); - // Load the blocks parent block from the database, returning invalid if that block is not - // found. - let parent_block = match self.get_block(&block.parent_root)? { - Some(block) => block, - None => { - return Ok(BlockProcessingOutcome::ParentUnknown { - parent: block.parent_root, - reference_location: "database", - }); - } - }; + let cached_snapshot = self + .block_processing_cache + .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|mut block_processing_cache| { + block_processing_cache.try_remove(block.parent_root) + }); - // Load the parent blocks state from the database, returning an error if it is not found. - // It is an error because if we know the parent block we should also know the parent state. - let parent_state_root = parent_block.state_root(); - let parent_state = self - .get_state(&parent_state_root, Some(parent_block.slot()))? - .ok_or_else(|| { - Error::DBInconsistent(format!("Missing state {:?}", parent_state_root)) - })?; + let (parent_block, parent_state) = if let Some(snapshot) = cached_snapshot { + (snapshot.beacon_block, snapshot.beacon_state) + } else { + // Load the blocks parent block from the database, returning invalid if that block is not + // found. + let parent_block = match self.get_block(&block.parent_root)? { + Some(block) => block, + None => { + return Ok(BlockProcessingOutcome::ParentUnknown { + parent: block.parent_root, + reference_location: "database", + }); + } + }; + + // Load the parent blocks state from the database, returning an error if it is not found. + // It is an error because if we know the parent block we should also know the parent state. + let parent_state_root = parent_block.state_root(); + let parent_state = self + .get_state(&parent_state_root, Some(parent_block.slot()))? + .ok_or_else(|| { + Error::DBInconsistent(format!("Missing state {:?}", parent_state_root)) + })?; + + (parent_block, parent_state) + }; metrics::stop_timer(db_read_timer); @@ -1479,8 +1474,27 @@ impl BeaconChain { // solution would be to use a database transaction (once our choice of database and API // settles down). // See: https://github.com/sigp/lighthouse/issues/692 - self.store.put_state(&state_root, state)?; - self.store.put_block(&block_root, signed_block)?; + self.store.put_state(&state_root, &state)?; + self.store.put_block(&block_root, signed_block.clone())?; + + self.block_processing_cache + .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .map(|mut block_processing_cache| { + block_processing_cache.insert(BeaconSnapshot { + beacon_block: signed_block, + beacon_block_root: block_root, + beacon_state: state, + beacon_state_root: state_root, + }); + }) + .unwrap_or_else(|| { + error!( + self.log, + "Failed to obtain cache write lock"; + "lock" => "block_processing_cache", + "task" => "process block" + ); + }); metrics::stop_timer(db_write_timer); @@ -1612,134 +1626,163 @@ impl BeaconChain { /// Execute the fork choice algorithm and enthrone the result as the canonical head. pub fn fork_choice(&self) -> Result<(), Error> { metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS); + let overall_timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES); - // Start fork choice metrics timer. - let timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES); - - // Determine the root of the block that is the head of the chain. - let beacon_block_root = self.fork_choice.find_head(&self)?; - - // If a new head was chosen. - let result = if beacon_block_root != self.head_info()?.block_root { - metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD); - - let beacon_block = self - .get_block(&beacon_block_root)? - .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; - - let beacon_state_root = beacon_block.state_root(); - let beacon_state: BeaconState = self - .get_state(&beacon_state_root, Some(beacon_block.slot()))? - .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; - - let previous_slot = self.head_info()?.slot; - let new_slot = beacon_block.slot(); - - // Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks - // between calls to fork choice without swapping between chains. This seems like an - // extreme-enough scenario that a warning is fine. - let is_reorg = self.head_info()?.block_root - != beacon_state - .get_block_root(self.head_info()?.slot) - .map(|root| *root) - .unwrap_or_else(|_| Hash256::random()); - - // If we switched to a new chain (instead of building atop the present chain). - if is_reorg { - metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT); - warn!( - self.log, - "Beacon chain re-org"; - "previous_head" => format!("{}", self.head_info()?.block_root), - "previous_slot" => previous_slot, - "new_head_parent" => format!("{}", beacon_block.parent_root()), - "new_head" => format!("{}", beacon_block_root), - "new_slot" => new_slot - ); - } else { - debug!( - self.log, - "Head beacon block"; - "justified_root" => format!("{}", beacon_state.current_justified_checkpoint.root), - "justified_epoch" => beacon_state.current_justified_checkpoint.epoch, - "finalized_root" => format!("{}", beacon_state.finalized_checkpoint.root), - "finalized_epoch" => beacon_state.finalized_checkpoint.epoch, - "root" => format!("{}", beacon_block_root), - "slot" => new_slot, - ); - }; - - let old_finalized_epoch = self.head_info()?.finalized_checkpoint.epoch; - let new_finalized_epoch = beacon_state.finalized_checkpoint.epoch; - let finalized_root = beacon_state.finalized_checkpoint.root; - - // Never revert back past a finalized epoch. - if new_finalized_epoch < old_finalized_epoch { - Err(Error::RevertedFinalizedEpoch { - previous_epoch: old_finalized_epoch, - new_epoch: new_finalized_epoch, - }) - } else { - let previous_head_beacon_block_root = self - .canonical_head - .try_read_for(HEAD_LOCK_TIMEOUT) - .ok_or_else(|| Error::CanonicalHeadLockTimeout)? - .beacon_block_root; - let current_head_beacon_block_root = beacon_block_root; - - let mut new_head = CheckPoint { - beacon_block, - beacon_block_root, - beacon_state, - beacon_state_root, - }; - - new_head.beacon_state.build_all_caches(&self.spec)?; - - let timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); - - // Update the checkpoint that stores the head of the chain at the time it received the - // block. - *self - .canonical_head - .try_write_for(HEAD_LOCK_TIMEOUT) - .ok_or_else(|| Error::CanonicalHeadLockTimeout)? = new_head; - - metrics::stop_timer(timer); - - if previous_slot.epoch(T::EthSpec::slots_per_epoch()) - < new_slot.epoch(T::EthSpec::slots_per_epoch()) - || is_reorg - { - self.persist_head_and_fork_choice()?; - } - - let _ = self.event_handler.register(EventKind::BeaconHeadChanged { - reorg: is_reorg, - previous_head_beacon_block_root, - current_head_beacon_block_root, - }); - - if new_finalized_epoch != old_finalized_epoch { - self.after_finalization(old_finalized_epoch, finalized_root)?; - } - - Ok(()) - } - } else { - Ok(()) - }; - - // End fork choice metrics timer. - metrics::stop_timer(timer); + let result = self.fork_choice_internal(); if result.is_err() { metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS); } + metrics::stop_timer(overall_timer); + result } + fn fork_choice_internal(&self) -> Result<(), Error> { + // Determine the root of the block that is the head of the chain. + let beacon_block_root = self.fork_choice.find_head(&self)?; + + let current_head = self.head_info()?; + + if beacon_block_root == current_head.block_root { + return Ok(()); + } + + // At this point we know that the new head block is not the same as the previous one + metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD); + + // Try and obtain the snapshot for `beacon_block_root` from the snapshot cache, falling + // back to a database read if that fails. + let new_head = self + .block_processing_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|block_processing_cache| block_processing_cache.get_cloned(beacon_block_root)) + .map::, _>(|snapshot| Ok(snapshot)) + .unwrap_or_else(|| { + let beacon_block = self + .get_block(&beacon_block_root)? + .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; + + let beacon_state_root = beacon_block.state_root(); + let beacon_state: BeaconState = self + .get_state(&beacon_state_root, Some(beacon_block.slot()))? + .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; + + Ok(BeaconSnapshot { + beacon_block, + beacon_block_root, + beacon_state, + beacon_state_root, + }) + }) + .and_then(|mut snapshot| { + // Regardless of where we got the state from, attempt to build the committee + // caches. + snapshot + .beacon_state + .build_all_committee_caches(&self.spec) + .map_err(Into::into) + .map(|()| snapshot) + })?; + + // Attempt to detect if the new head is not on the same chain as the previous block + // (i.e., a re-org). + // + // Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks + // between calls to fork choice without swapping between chains. This seems like an + // extreme-enough scenario that a warning is fine. + let is_reorg = current_head.block_root + != new_head + .beacon_state + .get_block_root(current_head.slot) + .map(|root| *root) + .unwrap_or_else(|_| Hash256::random()); + + if is_reorg { + metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT); + warn!( + self.log, + "Beacon chain re-org"; + "previous_head" => format!("{}", current_head.block_root), + "previous_slot" => current_head.slot, + "new_head_parent" => format!("{}", new_head.beacon_block.parent_root()), + "new_head" => format!("{}", beacon_block_root), + "new_slot" => new_head.beacon_block.slot() + ); + } else { + debug!( + self.log, + "Head beacon block"; + "justified_root" => format!("{}", new_head.beacon_state.current_justified_checkpoint.root), + "justified_epoch" => new_head.beacon_state.current_justified_checkpoint.epoch, + "finalized_root" => format!("{}", new_head.beacon_state.finalized_checkpoint.root), + "finalized_epoch" => new_head.beacon_state.finalized_checkpoint.epoch, + "root" => format!("{}", beacon_block_root), + "slot" => new_head.beacon_block.slot(), + ); + }; + + let old_finalized_epoch = current_head.finalized_checkpoint.epoch; + let new_finalized_epoch = new_head.beacon_state.finalized_checkpoint.epoch; + let finalized_root = new_head.beacon_state.finalized_checkpoint.root; + + // It is an error to try to update to a head with a lesser finalized epoch. + if new_finalized_epoch < old_finalized_epoch { + return Err(Error::RevertedFinalizedEpoch { + previous_epoch: old_finalized_epoch, + new_epoch: new_finalized_epoch, + }); + } + + if current_head.slot.epoch(T::EthSpec::slots_per_epoch()) + < new_head + .beacon_state + .slot + .epoch(T::EthSpec::slots_per_epoch()) + || is_reorg + { + self.persist_head_and_fork_choice()?; + } + + let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); + + // Update the snapshot that stores the head of the chain at the time it received the + // block. + *self + .canonical_head + .try_write_for(HEAD_LOCK_TIMEOUT) + .ok_or_else(|| Error::CanonicalHeadLockTimeout)? = new_head; + + metrics::stop_timer(update_head_timer); + + self.block_processing_cache + .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .map(|mut block_processing_cache| { + block_processing_cache.update_head(beacon_block_root); + }) + .unwrap_or_else(|| { + error!( + self.log, + "Failed to obtain cache write lock"; + "lock" => "block_processing_cache", + "task" => "update head" + ); + }); + + if new_finalized_epoch != old_finalized_epoch { + self.after_finalization(old_finalized_epoch, finalized_root)?; + } + + let _ = self.event_handler.register(EventKind::BeaconHeadChanged { + reorg: is_reorg, + previous_head_beacon_block_root: current_head.block_root, + current_head_beacon_block_root: beacon_block_root, + }); + + Ok(()) + } + /// Called after `self` has had a new block finalized. /// /// Performs pruning and finality-based optimizations. @@ -1764,11 +1807,22 @@ impl BeaconChain { } else { self.fork_choice.prune()?; + self.block_processing_cache + .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .map(|mut block_processing_cache| { + block_processing_cache.prune(new_finalized_epoch); + }) + .unwrap_or_else(|| { + error!( + self.log, + "Failed to obtain cache write lock"; + "lock" => "block_processing_cache", + "task" => "prune" + ); + }); + let finalized_state = self - .get_state_caching_only_with_committee_caches( - &finalized_block.state_root, - Some(finalized_block.slot), - )? + .get_state(&finalized_block.state_root, Some(finalized_block.slot))? .ok_or_else(|| Error::MissingBeaconState(finalized_block.state_root))?; self.op_pool.prune_all(&finalized_state, &self.spec); @@ -1801,10 +1855,10 @@ impl BeaconChain { /// /// This could be a very expensive operation and should only be done in testing/analysis /// activities. - pub fn chain_dump(&self) -> Result>, Error> { + pub fn chain_dump(&self) -> Result>, Error> { let mut dump = vec![]; - let mut last_slot = CheckPoint { + let mut last_slot = BeaconSnapshot { beacon_block: self.head()?.beacon_block, beacon_block_root: self.head()?.beacon_block_root, beacon_state: self.head()?.beacon_state, @@ -1831,7 +1885,7 @@ impl BeaconChain { Error::DBInconsistent(format!("Missing state {:?}", beacon_state_root)) })?; - let slot = CheckPoint { + let slot = BeaconSnapshot { beacon_block, beacon_block_root, beacon_state, diff --git a/beacon_node/beacon_chain/src/checkpoint.rs b/beacon_node/beacon_chain/src/beacon_snapshot.rs similarity index 95% rename from beacon_node/beacon_chain/src/checkpoint.rs rename to beacon_node/beacon_chain/src/beacon_snapshot.rs index 79d0f94b7..377850b3a 100644 --- a/beacon_node/beacon_chain/src/checkpoint.rs +++ b/beacon_node/beacon_chain/src/beacon_snapshot.rs @@ -5,14 +5,14 @@ use types::{BeaconState, EthSpec, Hash256, SignedBeaconBlock}; /// Represents some block and its associated state. Generally, this will be used for tracking the /// head, justified head and finalized head. #[derive(Clone, Serialize, PartialEq, Debug, Encode, Decode)] -pub struct CheckPoint { +pub struct BeaconSnapshot { pub beacon_block: SignedBeaconBlock, pub beacon_block_root: Hash256, pub beacon_state: BeaconState, pub beacon_state_root: Hash256, } -impl CheckPoint { +impl BeaconSnapshot { /// Create a new checkpoint. pub fn new( beacon_block: SignedBeaconBlock, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 0090e1530..c313b797d 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -7,10 +7,11 @@ use crate::fork_choice::SszForkChoice; use crate::head_tracker::HeadTracker; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::ShufflingCache; +use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE}; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ - BeaconChain, BeaconChainTypes, CheckPoint, Eth1Chain, Eth1ChainBackend, EventHandler, + BeaconChain, BeaconChainTypes, BeaconSnapshot, Eth1Chain, Eth1ChainBackend, EventHandler, ForkChoice, }; use eth1::Config as Eth1Config; @@ -71,10 +72,10 @@ where pub struct BeaconChainBuilder { store: Option>, store_migrator: Option, - canonical_head: Option>, + canonical_head: Option>, /// The finalized checkpoint to anchor the chain. May be genesis or a higher /// checkpoint. - pub finalized_checkpoint: Option>, + pub finalized_snapshot: Option>, genesis_block_root: Option, op_pool: Option>, fork_choice: Option>, @@ -110,7 +111,7 @@ where store: None, store_migrator: None, canonical_head: None, - finalized_checkpoint: None, + finalized_snapshot: None, genesis_block_root: None, op_pool: None, fork_choice: None, @@ -260,14 +261,14 @@ where .map_err(|e| format!("DB error when reading finalized state: {:?}", e))? .ok_or_else(|| "Finalized state not found in store".to_string())?; - self.finalized_checkpoint = Some(CheckPoint { + self.finalized_snapshot = Some(BeaconSnapshot { beacon_block_root: finalized_block_root, beacon_block: finalized_block, beacon_state_root: finalized_state_root, beacon_state: finalized_state, }); - self.canonical_head = Some(CheckPoint { + self.canonical_head = Some(BeaconSnapshot { beacon_block_root: head_block_root, beacon_block: head_block, beacon_state_root: head_state_root, @@ -304,7 +305,7 @@ where self.genesis_block_root = Some(beacon_block_root); store - .put_state(&beacon_state_root, beacon_state.clone()) + .put_state(&beacon_state_root, &beacon_state) .map_err(|e| format!("Failed to store genesis state: {:?}", e))?; store .put(&beacon_block_root, &beacon_block) @@ -318,7 +319,7 @@ where ) })?; - self.finalized_checkpoint = Some(CheckPoint { + self.finalized_snapshot = Some(BeaconSnapshot { beacon_block_root, beacon_block, beacon_state_root, @@ -380,7 +381,7 @@ where let mut canonical_head = if let Some(head) = self.canonical_head { head } else { - self.finalized_checkpoint + self.finalized_snapshot .ok_or_else(|| "Cannot build without a state".to_string())? }; @@ -420,7 +421,7 @@ where .op_pool .ok_or_else(|| "Cannot build without op pool".to_string())?, eth1_chain: self.eth1_chain, - canonical_head: TimeoutRwLock::new(canonical_head), + canonical_head: TimeoutRwLock::new(canonical_head.clone()), genesis_block_root: self .genesis_block_root .ok_or_else(|| "Cannot build without a genesis block root".to_string())?, @@ -431,6 +432,10 @@ where .event_handler .ok_or_else(|| "Cannot build without an event handler".to_string())?, head_tracker: self.head_tracker.unwrap_or_default(), + block_processing_cache: TimeoutRwLock::new(SnapshotCache::new( + DEFAULT_SNAPSHOT_CACHE_SIZE, + canonical_head, + )), shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), log: log.clone(), @@ -482,30 +487,30 @@ where ForkChoice::from_ssz_container(persisted) .map_err(|e| format!("Unable to read persisted fork choice from disk: {:?}", e))? } else { - let finalized_checkpoint = &self - .finalized_checkpoint + let finalized_snapshot = &self + .finalized_snapshot .as_ref() - .ok_or_else(|| "fork_choice_backend requires a finalized_checkpoint")?; + .ok_or_else(|| "fork_choice_backend requires a finalized_snapshot")?; let genesis_block_root = self .genesis_block_root .ok_or_else(|| "fork_choice_backend requires a genesis_block_root")?; let backend = ProtoArrayForkChoice::new( - finalized_checkpoint.beacon_block.message.slot, - finalized_checkpoint.beacon_block.message.state_root, + finalized_snapshot.beacon_block.message.slot, + finalized_snapshot.beacon_block.message.state_root, // Note: here we set the `justified_epoch` to be the same as the epoch of the // finalized checkpoint. Whilst this finalized checkpoint may actually point to // a _later_ justified checkpoint, that checkpoint won't yet exist in the fork // choice. - finalized_checkpoint.beacon_state.current_epoch(), - finalized_checkpoint.beacon_state.current_epoch(), - finalized_checkpoint.beacon_block_root, + finalized_snapshot.beacon_state.current_epoch(), + finalized_snapshot.beacon_state.current_epoch(), + finalized_snapshot.beacon_block_root, )?; ForkChoice::new( backend, genesis_block_root, - &finalized_checkpoint.beacon_state, + &finalized_snapshot.beacon_state, ) }; @@ -576,7 +581,7 @@ where /// Requires the state to be initialized. pub fn testing_slot_clock(self, slot_duration: Duration) -> Result { let genesis_time = self - .finalized_checkpoint + .finalized_snapshot .as_ref() .ok_or_else(|| "testing_slot_clock requires an initialized state")? .beacon_state diff --git a/beacon_node/beacon_chain/src/fork_choice/checkpoint_manager.rs b/beacon_node/beacon_chain/src/fork_choice/checkpoint_manager.rs index fd4512ce3..d93ddeb0f 100644 --- a/beacon_node/beacon_chain/src/fork_choice/checkpoint_manager.rs +++ b/beacon_node/beacon_chain/src/fork_choice/checkpoint_manager.rs @@ -306,10 +306,7 @@ impl CheckpointManager { .ok_or_else(|| Error::UnknownJustifiedBlock(block_root))?; let state = chain - .get_state_caching_only_with_committee_caches( - &block.state_root(), - Some(block.slot()), - )? + .get_state(&block.state_root(), Some(block.slot()))? .ok_or_else(|| Error::UnknownJustifiedState(block.state_root()))?; Ok(get_effective_balances(&state)) diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index cc4a9b8f5..1b4297faa 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -3,8 +3,8 @@ extern crate lazy_static; mod beacon_chain; +mod beacon_snapshot; pub mod builder; -mod checkpoint; mod errors; pub mod eth1_chain; pub mod events; @@ -13,6 +13,7 @@ mod head_tracker; mod metrics; mod persisted_beacon_chain; mod shuffling_cache; +mod snapshot_cache; pub mod test_utils; mod timeout_rw_lock; mod validator_pubkey_cache; @@ -21,7 +22,7 @@ pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome, StateSkipConfig, }; -pub use self::checkpoint::CheckPoint; +pub use self::beacon_snapshot::BeaconSnapshot; pub use self::errors::{BeaconChainError, BlockProductionError}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; pub use events::EventHandler; diff --git a/beacon_node/beacon_chain/src/snapshot_cache.rs b/beacon_node/beacon_chain/src/snapshot_cache.rs new file mode 100644 index 000000000..fffe715f9 --- /dev/null +++ b/beacon_node/beacon_chain/src/snapshot_cache.rs @@ -0,0 +1,217 @@ +use crate::BeaconSnapshot; +use std::cmp; +use types::{Epoch, EthSpec, Hash256}; + +/// The default size of the cache. +pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4; + +/// Provides a cache of `BeaconSnapshot` that is intended primarily for block processing. +/// +/// ## Cache Queuing +/// +/// The cache has a non-standard queue mechanism (specifically, it is not LRU). +/// +/// The cache has a max number of elements (`max_len`). Until `max_len` is achieved, all snapshots +/// are simply added to the queue. Once `max_len` is achieved, adding a new snapshot will cause an +/// existing snapshot to be ejected. The ejected snapshot will: +/// +/// - Never be the `head_block_root`. +/// - Be the snapshot with the lowest `state.slot` (ties broken arbitrarily). +pub struct SnapshotCache { + max_len: usize, + head_block_root: Hash256, + snapshots: Vec>, +} + +impl SnapshotCache { + /// Instantiate a new cache which contains the `head` snapshot. + /// + /// Setting `max_len = 0` is equivalent to setting `max_len = 1`. + pub fn new(max_len: usize, head: BeaconSnapshot) -> Self { + Self { + max_len: cmp::max(max_len, 1), + head_block_root: head.beacon_block_root, + snapshots: vec![head], + } + } + + /// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see + /// struct-level documentation for more info). + pub fn insert(&mut self, snapshot: BeaconSnapshot) { + if self.snapshots.len() < self.max_len { + self.snapshots.push(snapshot); + } else { + let insert_at = self + .snapshots + .iter() + .enumerate() + .filter_map(|(i, snapshot)| { + if snapshot.beacon_block_root != self.head_block_root { + Some((i, snapshot.beacon_state.slot)) + } else { + None + } + }) + .min_by_key(|(_i, slot)| *slot) + .map(|(i, _slot)| i); + + if let Some(i) = insert_at { + self.snapshots[i] = snapshot; + } + } + } + + /// If there is a snapshot with `block_root`, remove and return it. + pub fn try_remove(&mut self, block_root: Hash256) -> Option> { + self.snapshots + .iter() + .position(|snapshot| snapshot.beacon_block_root == block_root) + .map(|i| self.snapshots.remove(i)) + } + + /// If there is a snapshot with `block_root`, clone it (with only the committee caches) and + /// return the clone. + pub fn get_cloned(&self, block_root: Hash256) -> Option> { + self.snapshots + .iter() + .find(|snapshot| snapshot.beacon_block_root == block_root) + .map(|snapshot| snapshot.clone_with_only_committee_caches()) + } + + /// Removes all snapshots from the queue that are less than or equal to the finalized epoch. + pub fn prune(&mut self, finalized_epoch: Epoch) { + self.snapshots.retain(|snapshot| { + snapshot.beacon_state.slot > finalized_epoch.start_slot(T::slots_per_epoch()) + }) + } + + /// Inform the cache that the head of the beacon chain has changed. + /// + /// The snapshot that matches this `head_block_root` will never be ejected from the cache + /// during `Self::insert`. + pub fn update_head(&mut self, head_block_root: Hash256) { + self.head_block_root = head_block_root + } +} + +#[cfg(test)] +mod test { + use super::*; + use types::{ + test_utils::{generate_deterministic_keypair, TestingBeaconStateBuilder}, + BeaconBlock, Epoch, MainnetEthSpec, Signature, SignedBeaconBlock, Slot, + }; + + const CACHE_SIZE: usize = 4; + + fn get_snapshot(i: u64) -> BeaconSnapshot { + let spec = MainnetEthSpec::default_spec(); + + let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(1, &spec); + let (beacon_state, _keypairs) = state_builder.build(); + + BeaconSnapshot { + beacon_state, + beacon_state_root: Hash256::from_low_u64_be(i), + beacon_block: SignedBeaconBlock { + message: BeaconBlock::empty(&spec), + signature: Signature::new(&[42], &generate_deterministic_keypair(0).sk), + }, + beacon_block_root: Hash256::from_low_u64_be(i), + } + } + + #[test] + fn insert_get_prune_update() { + let mut cache = SnapshotCache::new(CACHE_SIZE, get_snapshot(0)); + + // Insert a bunch of entries in the cache. It should look like this: + // + // Index Root + // 0 0 <--head + // 1 1 + // 2 2 + // 3 3 + for i in 1..CACHE_SIZE as u64 { + let mut snapshot = get_snapshot(i); + + // Each snapshot should be one slot into an epoch, with each snapshot one epoch apart. + snapshot.beacon_state.slot = Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1); + + cache.insert(snapshot); + + assert_eq!( + cache.snapshots.len(), + i as usize + 1, + "cache length should be as expected" + ); + assert_eq!(cache.head_block_root, Hash256::from_low_u64_be(0)); + } + + // Insert a new value in the cache. Afterwards it should look like: + // + // Index Root + // 0 0 <--head + // 1 42 + // 2 2 + // 3 3 + assert_eq!(cache.snapshots.len(), CACHE_SIZE); + cache.insert(get_snapshot(42)); + assert_eq!(cache.snapshots.len(), CACHE_SIZE); + + assert!( + cache.try_remove(Hash256::from_low_u64_be(1)).is_none(), + "the snapshot with the lowest slot should have been removed during the insert function" + ); + assert!(cache.get_cloned(Hash256::from_low_u64_be(1)).is_none()); + + assert!( + cache + .get_cloned(Hash256::from_low_u64_be(0)) + .expect("the head should still be in the cache") + .beacon_block_root + == Hash256::from_low_u64_be(0), + "get_cloned should get the correct snapshot" + ); + assert!( + cache + .try_remove(Hash256::from_low_u64_be(0)) + .expect("the head should still be in the cache") + .beacon_block_root + == Hash256::from_low_u64_be(0), + "try_remove should get the correct snapshot" + ); + + assert_eq!( + cache.snapshots.len(), + CACHE_SIZE - 1, + "try_remove should shorten the cache" + ); + + // Prune the cache. Afterwards it should look like: + // + // Index Root + // 0 2 + // 1 3 + cache.prune(Epoch::new(2)); + + assert_eq!(cache.snapshots.len(), 2); + + cache.update_head(Hash256::from_low_u64_be(2)); + + // Over-fill the cache so it needs to eject some old values on insert. + for i in 0..CACHE_SIZE as u64 { + cache.insert(get_snapshot(u64::max_value() - i)); + } + + // Ensure that the new head value was not removed from the cache. + assert!( + cache + .try_remove(Hash256::from_low_u64_be(2)) + .expect("the new head should still be in the cache") + .beacon_block_root + == Hash256::from_low_u64_be(2), + "try_remove should get the correct snapshot" + ); + } +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 6d793f6b3..d66cf60c2 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -731,7 +731,7 @@ where .ok_or_else(|| "system_time_slot_clock requires a beacon_chain_builder")?; let genesis_time = beacon_chain_builder - .finalized_checkpoint + .finalized_snapshot .as_ref() .ok_or_else(|| "system_time_slot_clock requires an initialized beacon state")? .beacon_state diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 48329b1e3..362b866ba 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -22,7 +22,6 @@ use std::convert::TryInto; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; -use types::beacon_state::CloneConfig; use types::*; /// 32-byte key for accessing the `split` of the freezer DB. @@ -47,8 +46,6 @@ pub struct HotColdDB { pub(crate) hot_db: LevelDB, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, - /// LRU cache of deserialized states. Updated whenever a state is loaded. - state_cache: Mutex>>, /// Chain spec. spec: ChainSpec, /// Logger. @@ -145,7 +142,7 @@ impl Store for HotColdDB { } /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: BeaconState) -> Result<(), Error> { + fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { self.store_cold_state(state_root, &state) } else { @@ -159,7 +156,7 @@ impl Store for HotColdDB { state_root: &Hash256, slot: Option, ) -> Result>, Error> { - self.get_state_with(state_root, slot, CloneConfig::all()) + self.get_state_with(state_root, slot) } /// Get a state from the store. @@ -169,7 +166,6 @@ impl Store for HotColdDB { &self, state_root: &Hash256, slot: Option, - clone_config: CloneConfig, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT); @@ -177,10 +173,10 @@ impl Store for HotColdDB { if slot < self.get_split_slot() { self.load_cold_state_by_slot(slot).map(Some) } else { - self.load_hot_state(state_root, clone_config) + self.load_hot_state(state_root) } } else { - match self.load_hot_state(state_root, clone_config)? { + match self.load_hot_state(state_root)? { Some(state) => Ok(Some(state)), None => self.load_cold_state(state_root), } @@ -204,9 +200,6 @@ impl Store for HotColdDB { .key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?; } - // Delete from the cache. - self.state_cache.lock().pop(state_root); - Ok(()) } @@ -309,10 +302,7 @@ impl Store for HotColdDB { { // NOTE: minor inefficiency here because we load an unnecessary hot state summary let state = self - .load_hot_state( - &epoch_boundary_state_root, - CloneConfig::committee_caches_only(), - )? + .load_hot_state(&epoch_boundary_state_root)? .ok_or_else(|| { HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root) })?; @@ -349,7 +339,6 @@ impl HotColdDB { cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), - state_cache: Mutex::new(LruCache::new(config.state_cache_size)), config, spec, log, @@ -371,7 +360,7 @@ impl HotColdDB { pub fn store_hot_state( &self, state_root: &Hash256, - state: BeaconState, + state: &BeaconState, ) -> Result<(), Error> { // On the epoch boundary, store the full state. if state.slot % E::slots_per_epoch() == 0 { @@ -387,10 +376,7 @@ impl HotColdDB { // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. - self.put_state_summary(state_root, HotStateSummary::new(state_root, &state)?)?; - - // Store the state in the cache. - self.state_cache.lock().put(*state_root, state); + self.put_state_summary(state_root, HotStateSummary::new(state_root, state)?)?; Ok(()) } @@ -398,24 +384,9 @@ impl HotColdDB { /// Load a post-finalization state from the hot database. /// /// Will replay blocks from the nearest epoch boundary. - pub fn load_hot_state( - &self, - state_root: &Hash256, - clone_config: CloneConfig, - ) -> Result>, Error> { + pub fn load_hot_state(&self, state_root: &Hash256) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); - // Check the cache. - if let Some(state) = self.state_cache.lock().get(state_root) { - metrics::inc_counter(&metrics::BEACON_STATE_CACHE_HIT_COUNT); - - let timer = metrics::start_timer(&metrics::BEACON_STATE_CACHE_CLONE_TIME); - let state = state.clone_with(clone_config); - metrics::stop_timer(timer); - - return Ok(Some(state)); - } - if let Some(HotStateSummary { slot, latest_block_root, @@ -439,9 +410,6 @@ impl HotColdDB { self.replay_blocks(boundary_state, blocks, slot)? }; - // Update the LRU cache. - self.state_cache.lock().put(*state_root, state.clone()); - Ok(Some(state)) } else { Ok(None) diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index d0c19f358..43bdd164d 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -345,7 +345,7 @@ mod test { let state_a_root = hashes.next().unwrap(); state_b.state_roots[0] = state_a_root; - store.put_state(&state_a_root, state_a).unwrap(); + store.put_state(&state_a_root, &state_a).unwrap(); let iter = BlockRootsIterator::new(store, &state_b); @@ -393,8 +393,8 @@ mod test { let state_a_root = Hash256::from_low_u64_be(slots_per_historical_root as u64); let state_b_root = Hash256::from_low_u64_be(slots_per_historical_root as u64 * 2); - store.put_state(&state_a_root, state_a).unwrap(); - store.put_state(&state_b_root, state_b.clone()).unwrap(); + store.put_state(&state_a_root, &state_a).unwrap(); + store.put_state(&state_b_root, &state_b.clone()).unwrap(); let iter = StateRootsIterator::new(store, &state_b); diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 9ee296e62..4f519a77d 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -123,7 +123,7 @@ impl Store for LevelDB { } /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: BeaconState) -> Result<(), Error> { + fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { store_full_state(self, state_root, &state) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 80da2ced1..a8220b08c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -38,7 +38,6 @@ pub use errors::Error; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metrics::scrape_for_metrics; pub use state_batch::StateBatch; -pub use types::beacon_state::CloneConfig; pub use types::*; /// An object capable of storing and retrieving objects implementing `StoreItem`. @@ -97,7 +96,7 @@ pub trait Store: Sync + Send + Sized + 'static { } /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: BeaconState) -> Result<(), Error>; + fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error>; /// Store a state summary in the store. // NOTE: this is a hack for the HotColdDb, we could consider splitting this @@ -122,7 +121,6 @@ pub trait Store: Sync + Send + Sized + 'static { &self, state_root: &Hash256, slot: Option, - _clone_config: CloneConfig, ) -> Result>, Error> { // Default impl ignores config. Overriden in `HotColdDb`. self.get_state(state_root, slot) diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 81826b31f..7216ca6da 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -76,7 +76,7 @@ impl Store for MemoryStore { } /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: BeaconState) -> Result<(), Error> { + fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { store_full_state(self, state_root, &state) } diff --git a/beacon_node/store/src/state_batch.rs b/beacon_node/store/src/state_batch.rs index a33e07225..c19173861 100644 --- a/beacon_node/store/src/state_batch.rs +++ b/beacon_node/store/src/state_batch.rs @@ -38,7 +38,7 @@ impl StateBatch { /// May fail to write the full batch if any of the items error (i.e. not atomic!) pub fn commit>(self, store: &S) -> Result<(), Error> { self.items.into_iter().try_for_each(|item| match item { - BatchItem::Full(state_root, state) => store.put_state(&state_root, state), + BatchItem::Full(state_root, state) => store.put_state(&state_root, &state), BatchItem::Summary(state_root, summary) => { store.put_state_summary(&state_root, summary) }