Add no-copy block processing cache (#863)

* Add state cache, remove store cache

* Only build the head committee cache

* Fix compile error

* Fix compile error from merge

* Rename state_cache -> checkpoint_cache

* Rename Checkpoint -> Snapshot

* Tidy, add comments

* Tidy up find_head function

* Change some checkpoint -> snapshot

* Add tests

* Expose max_len

* Remove dead code

* Tidy

* Fix bug
This commit is contained in:
Paul Hauner 2020-04-06 10:53:33 +10:00 committed by GitHub
parent 93bcee147d
commit 2fb6b7c793
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 494 additions and 254 deletions

View File

@ -1,4 +1,3 @@
use crate::checkpoint::CheckPoint;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::events::{EventHandler, EventKind};
@ -7,8 +6,10 @@ use crate::head_tracker::HeadTracker;
use crate::metrics;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::SnapshotCache;
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconSnapshot;
use operation_pool::{OperationPool, PersistedOperationPool};
use slog::{debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
@ -54,6 +55,9 @@ const MAXIMUM_BLOCK_SLOT_NUMBER: u64 = 4_294_967_296; // 2^32
/// head.
const HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the block
/// processing cache.
const BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the
/// attestation cache.
const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
@ -182,7 +186,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Provides information from the Ethereum 1 (PoW) chain.
pub eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec, T::Store>>,
/// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received.
pub(crate) canonical_head: TimeoutRwLock<CheckPoint<T::EthSpec>>,
pub(crate) canonical_head: TimeoutRwLock<BeaconSnapshot<T::EthSpec>>,
/// The root of the genesis block.
pub genesis_block_root: Hash256,
/// A state-machine that is updated with information from the network and chooses a canonical
@ -192,6 +196,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub event_handler: T::EventHandler,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: HeadTracker,
/// A cache dedicated to block processing.
pub(crate) block_processing_cache: TimeoutRwLock<SnapshotCache<T::EthSpec>>,
/// Caches the shuffling for a given epoch and state root.
pub(crate) shuffling_cache: TimeoutRwLock<ShufflingCache>,
/// Caches a map of `validator_index -> validator_pubkey`.
@ -444,34 +450,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.store.get_state(state_root, slot)?)
}
/// Returns the state at the given root, if any.
///
/// The return state does not contain any caches other than the committee caches. This method
/// is much faster than `Self::get_state` because it does not clone the tree hash cache
/// when the state is found in the cache.
///
/// ## Errors
///
/// May return a database error.
pub fn get_state_caching_only_with_committee_caches(
&self,
state_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<BeaconState<T::EthSpec>>, Error> {
Ok(self.store.get_state_with(
state_root,
slot,
types::beacon_state::CloneConfig::committee_caches_only(),
)?)
}
/// Returns a `Checkpoint` representing the head block and state. Contains the "best block";
/// the head of the canonical `BeaconChain`.
///
/// It is important to note that the `beacon_state` returned may not match the present slot. It
/// is the state as it was when the head block was received, which could be some slots prior to
/// now.
pub fn head(&self) -> Result<CheckPoint<T::EthSpec>, Error> {
pub fn head(&self) -> Result<BeaconSnapshot<T::EthSpec>, Error> {
self.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)
@ -703,7 +688,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
drop(head);
let mut state = self
.get_state_caching_only_with_committee_caches(&state_root, Some(slot))?
.get_state(&state_root, Some(slot))?
.ok_or_else(|| Error::MissingBeaconState(state_root))?;
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
@ -967,10 +952,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);
let mut state = self
.get_state_caching_only_with_committee_caches(
&target_block_state_root,
Some(target_block_slot),
)?
.get_state(&target_block_state_root, Some(target_block_slot))?
.ok_or_else(|| Error::MissingBeaconState(target_block_state_root))?;
metrics::stop_timer(state_read_timer);
@ -1305,26 +1287,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// processing.
let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ);
// Load the blocks parent block from the database, returning invalid if that block is not
// found.
let parent_block = match self.get_block(&block.parent_root)? {
Some(block) => block,
None => {
return Ok(BlockProcessingOutcome::ParentUnknown {
parent: block.parent_root,
reference_location: "database",
});
}
};
let cached_snapshot = self
.block_processing_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|mut block_processing_cache| {
block_processing_cache.try_remove(block.parent_root)
});
// Load the parent blocks state from the database, returning an error if it is not found.
// It is an error because if we know the parent block we should also know the parent state.
let parent_state_root = parent_block.state_root();
let parent_state = self
.get_state(&parent_state_root, Some(parent_block.slot()))?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing state {:?}", parent_state_root))
})?;
let (parent_block, parent_state) = if let Some(snapshot) = cached_snapshot {
(snapshot.beacon_block, snapshot.beacon_state)
} else {
// Load the blocks parent block from the database, returning invalid if that block is not
// found.
let parent_block = match self.get_block(&block.parent_root)? {
Some(block) => block,
None => {
return Ok(BlockProcessingOutcome::ParentUnknown {
parent: block.parent_root,
reference_location: "database",
});
}
};
// Load the parent blocks state from the database, returning an error if it is not found.
// It is an error because if we know the parent block we should also know the parent state.
let parent_state_root = parent_block.state_root();
let parent_state = self
.get_state(&parent_state_root, Some(parent_block.slot()))?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing state {:?}", parent_state_root))
})?;
(parent_block, parent_state)
};
metrics::stop_timer(db_read_timer);
@ -1479,8 +1474,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// solution would be to use a database transaction (once our choice of database and API
// settles down).
// See: https://github.com/sigp/lighthouse/issues/692
self.store.put_state(&state_root, state)?;
self.store.put_block(&block_root, signed_block)?;
self.store.put_state(&state_root, &state)?;
self.store.put_block(&block_root, signed_block.clone())?;
self.block_processing_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut block_processing_cache| {
block_processing_cache.insert(BeaconSnapshot {
beacon_block: signed_block,
beacon_block_root: block_root,
beacon_state: state,
beacon_state_root: state_root,
});
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "block_processing_cache",
"task" => "process block"
);
});
metrics::stop_timer(db_write_timer);
@ -1612,134 +1626,163 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Execute the fork choice algorithm and enthrone the result as the canonical head.
pub fn fork_choice(&self) -> Result<(), Error> {
metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
let overall_timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);
// Start fork choice metrics timer.
let timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);
// Determine the root of the block that is the head of the chain.
let beacon_block_root = self.fork_choice.find_head(&self)?;
// If a new head was chosen.
let result = if beacon_block_root != self.head_info()?.block_root {
metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD);
let beacon_block = self
.get_block(&beacon_block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?;
let beacon_state_root = beacon_block.state_root();
let beacon_state: BeaconState<T::EthSpec> = self
.get_state(&beacon_state_root, Some(beacon_block.slot()))?
.ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?;
let previous_slot = self.head_info()?.slot;
let new_slot = beacon_block.slot();
// Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks
// between calls to fork choice without swapping between chains. This seems like an
// extreme-enough scenario that a warning is fine.
let is_reorg = self.head_info()?.block_root
!= beacon_state
.get_block_root(self.head_info()?.slot)
.map(|root| *root)
.unwrap_or_else(|_| Hash256::random());
// If we switched to a new chain (instead of building atop the present chain).
if is_reorg {
metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT);
warn!(
self.log,
"Beacon chain re-org";
"previous_head" => format!("{}", self.head_info()?.block_root),
"previous_slot" => previous_slot,
"new_head_parent" => format!("{}", beacon_block.parent_root()),
"new_head" => format!("{}", beacon_block_root),
"new_slot" => new_slot
);
} else {
debug!(
self.log,
"Head beacon block";
"justified_root" => format!("{}", beacon_state.current_justified_checkpoint.root),
"justified_epoch" => beacon_state.current_justified_checkpoint.epoch,
"finalized_root" => format!("{}", beacon_state.finalized_checkpoint.root),
"finalized_epoch" => beacon_state.finalized_checkpoint.epoch,
"root" => format!("{}", beacon_block_root),
"slot" => new_slot,
);
};
let old_finalized_epoch = self.head_info()?.finalized_checkpoint.epoch;
let new_finalized_epoch = beacon_state.finalized_checkpoint.epoch;
let finalized_root = beacon_state.finalized_checkpoint.root;
// Never revert back past a finalized epoch.
if new_finalized_epoch < old_finalized_epoch {
Err(Error::RevertedFinalizedEpoch {
previous_epoch: old_finalized_epoch,
new_epoch: new_finalized_epoch,
})
} else {
let previous_head_beacon_block_root = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)?
.beacon_block_root;
let current_head_beacon_block_root = beacon_block_root;
let mut new_head = CheckPoint {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
};
new_head.beacon_state.build_all_caches(&self.spec)?;
let timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);
// Update the checkpoint that stores the head of the chain at the time it received the
// block.
*self
.canonical_head
.try_write_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)? = new_head;
metrics::stop_timer(timer);
if previous_slot.epoch(T::EthSpec::slots_per_epoch())
< new_slot.epoch(T::EthSpec::slots_per_epoch())
|| is_reorg
{
self.persist_head_and_fork_choice()?;
}
let _ = self.event_handler.register(EventKind::BeaconHeadChanged {
reorg: is_reorg,
previous_head_beacon_block_root,
current_head_beacon_block_root,
});
if new_finalized_epoch != old_finalized_epoch {
self.after_finalization(old_finalized_epoch, finalized_root)?;
}
Ok(())
}
} else {
Ok(())
};
// End fork choice metrics timer.
metrics::stop_timer(timer);
let result = self.fork_choice_internal();
if result.is_err() {
metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS);
}
metrics::stop_timer(overall_timer);
result
}
fn fork_choice_internal(&self) -> Result<(), Error> {
// Determine the root of the block that is the head of the chain.
let beacon_block_root = self.fork_choice.find_head(&self)?;
let current_head = self.head_info()?;
if beacon_block_root == current_head.block_root {
return Ok(());
}
// At this point we know that the new head block is not the same as the previous one
metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD);
// Try and obtain the snapshot for `beacon_block_root` from the snapshot cache, falling
// back to a database read if that fails.
let new_head = self
.block_processing_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|block_processing_cache| block_processing_cache.get_cloned(beacon_block_root))
.map::<Result<_, Error>, _>(|snapshot| Ok(snapshot))
.unwrap_or_else(|| {
let beacon_block = self
.get_block(&beacon_block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?;
let beacon_state_root = beacon_block.state_root();
let beacon_state: BeaconState<T::EthSpec> = self
.get_state(&beacon_state_root, Some(beacon_block.slot()))?
.ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?;
Ok(BeaconSnapshot {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
})
})
.and_then(|mut snapshot| {
// Regardless of where we got the state from, attempt to build the committee
// caches.
snapshot
.beacon_state
.build_all_committee_caches(&self.spec)
.map_err(Into::into)
.map(|()| snapshot)
})?;
// Attempt to detect if the new head is not on the same chain as the previous block
// (i.e., a re-org).
//
// Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks
// between calls to fork choice without swapping between chains. This seems like an
// extreme-enough scenario that a warning is fine.
let is_reorg = current_head.block_root
!= new_head
.beacon_state
.get_block_root(current_head.slot)
.map(|root| *root)
.unwrap_or_else(|_| Hash256::random());
if is_reorg {
metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT);
warn!(
self.log,
"Beacon chain re-org";
"previous_head" => format!("{}", current_head.block_root),
"previous_slot" => current_head.slot,
"new_head_parent" => format!("{}", new_head.beacon_block.parent_root()),
"new_head" => format!("{}", beacon_block_root),
"new_slot" => new_head.beacon_block.slot()
);
} else {
debug!(
self.log,
"Head beacon block";
"justified_root" => format!("{}", new_head.beacon_state.current_justified_checkpoint.root),
"justified_epoch" => new_head.beacon_state.current_justified_checkpoint.epoch,
"finalized_root" => format!("{}", new_head.beacon_state.finalized_checkpoint.root),
"finalized_epoch" => new_head.beacon_state.finalized_checkpoint.epoch,
"root" => format!("{}", beacon_block_root),
"slot" => new_head.beacon_block.slot(),
);
};
let old_finalized_epoch = current_head.finalized_checkpoint.epoch;
let new_finalized_epoch = new_head.beacon_state.finalized_checkpoint.epoch;
let finalized_root = new_head.beacon_state.finalized_checkpoint.root;
// It is an error to try to update to a head with a lesser finalized epoch.
if new_finalized_epoch < old_finalized_epoch {
return Err(Error::RevertedFinalizedEpoch {
previous_epoch: old_finalized_epoch,
new_epoch: new_finalized_epoch,
});
}
if current_head.slot.epoch(T::EthSpec::slots_per_epoch())
< new_head
.beacon_state
.slot
.epoch(T::EthSpec::slots_per_epoch())
|| is_reorg
{
self.persist_head_and_fork_choice()?;
}
let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);
// Update the snapshot that stores the head of the chain at the time it received the
// block.
*self
.canonical_head
.try_write_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)? = new_head;
metrics::stop_timer(update_head_timer);
self.block_processing_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut block_processing_cache| {
block_processing_cache.update_head(beacon_block_root);
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "block_processing_cache",
"task" => "update head"
);
});
if new_finalized_epoch != old_finalized_epoch {
self.after_finalization(old_finalized_epoch, finalized_root)?;
}
let _ = self.event_handler.register(EventKind::BeaconHeadChanged {
reorg: is_reorg,
previous_head_beacon_block_root: current_head.block_root,
current_head_beacon_block_root: beacon_block_root,
});
Ok(())
}
/// Called after `self` has had a new block finalized.
///
/// Performs pruning and finality-based optimizations.
@ -1764,11 +1807,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} else {
self.fork_choice.prune()?;
self.block_processing_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut block_processing_cache| {
block_processing_cache.prune(new_finalized_epoch);
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "block_processing_cache",
"task" => "prune"
);
});
let finalized_state = self
.get_state_caching_only_with_committee_caches(
&finalized_block.state_root,
Some(finalized_block.slot),
)?
.get_state(&finalized_block.state_root, Some(finalized_block.slot))?
.ok_or_else(|| Error::MissingBeaconState(finalized_block.state_root))?;
self.op_pool.prune_all(&finalized_state, &self.spec);
@ -1801,10 +1855,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// This could be a very expensive operation and should only be done in testing/analysis
/// activities.
pub fn chain_dump(&self) -> Result<Vec<CheckPoint<T::EthSpec>>, Error> {
pub fn chain_dump(&self) -> Result<Vec<BeaconSnapshot<T::EthSpec>>, Error> {
let mut dump = vec![];
let mut last_slot = CheckPoint {
let mut last_slot = BeaconSnapshot {
beacon_block: self.head()?.beacon_block,
beacon_block_root: self.head()?.beacon_block_root,
beacon_state: self.head()?.beacon_state,
@ -1831,7 +1885,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Error::DBInconsistent(format!("Missing state {:?}", beacon_state_root))
})?;
let slot = CheckPoint {
let slot = BeaconSnapshot {
beacon_block,
beacon_block_root,
beacon_state,

View File

@ -5,14 +5,14 @@ use types::{BeaconState, EthSpec, Hash256, SignedBeaconBlock};
/// Represents some block and its associated state. Generally, this will be used for tracking the
/// head, justified head and finalized head.
#[derive(Clone, Serialize, PartialEq, Debug, Encode, Decode)]
pub struct CheckPoint<E: EthSpec> {
pub struct BeaconSnapshot<E: EthSpec> {
pub beacon_block: SignedBeaconBlock<E>,
pub beacon_block_root: Hash256,
pub beacon_state: BeaconState<E>,
pub beacon_state_root: Hash256,
}
impl<E: EthSpec> CheckPoint<E> {
impl<E: EthSpec> BeaconSnapshot<E> {
/// Create a new checkpoint.
pub fn new(
beacon_block: SignedBeaconBlock<E>,

View File

@ -7,10 +7,11 @@ use crate::fork_choice::SszForkChoice;
use crate::head_tracker::HeadTracker;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
BeaconChain, BeaconChainTypes, CheckPoint, Eth1Chain, Eth1ChainBackend, EventHandler,
BeaconChain, BeaconChainTypes, BeaconSnapshot, Eth1Chain, Eth1ChainBackend, EventHandler,
ForkChoice,
};
use eth1::Config as Eth1Config;
@ -71,10 +72,10 @@ where
pub struct BeaconChainBuilder<T: BeaconChainTypes> {
store: Option<Arc<T::Store>>,
store_migrator: Option<T::StoreMigrator>,
canonical_head: Option<CheckPoint<T::EthSpec>>,
canonical_head: Option<BeaconSnapshot<T::EthSpec>>,
/// The finalized checkpoint to anchor the chain. May be genesis or a higher
/// checkpoint.
pub finalized_checkpoint: Option<CheckPoint<T::EthSpec>>,
pub finalized_snapshot: Option<BeaconSnapshot<T::EthSpec>>,
genesis_block_root: Option<Hash256>,
op_pool: Option<OperationPool<T::EthSpec>>,
fork_choice: Option<ForkChoice<T>>,
@ -110,7 +111,7 @@ where
store: None,
store_migrator: None,
canonical_head: None,
finalized_checkpoint: None,
finalized_snapshot: None,
genesis_block_root: None,
op_pool: None,
fork_choice: None,
@ -260,14 +261,14 @@ where
.map_err(|e| format!("DB error when reading finalized state: {:?}", e))?
.ok_or_else(|| "Finalized state not found in store".to_string())?;
self.finalized_checkpoint = Some(CheckPoint {
self.finalized_snapshot = Some(BeaconSnapshot {
beacon_block_root: finalized_block_root,
beacon_block: finalized_block,
beacon_state_root: finalized_state_root,
beacon_state: finalized_state,
});
self.canonical_head = Some(CheckPoint {
self.canonical_head = Some(BeaconSnapshot {
beacon_block_root: head_block_root,
beacon_block: head_block,
beacon_state_root: head_state_root,
@ -304,7 +305,7 @@ where
self.genesis_block_root = Some(beacon_block_root);
store
.put_state(&beacon_state_root, beacon_state.clone())
.put_state(&beacon_state_root, &beacon_state)
.map_err(|e| format!("Failed to store genesis state: {:?}", e))?;
store
.put(&beacon_block_root, &beacon_block)
@ -318,7 +319,7 @@ where
)
})?;
self.finalized_checkpoint = Some(CheckPoint {
self.finalized_snapshot = Some(BeaconSnapshot {
beacon_block_root,
beacon_block,
beacon_state_root,
@ -380,7 +381,7 @@ where
let mut canonical_head = if let Some(head) = self.canonical_head {
head
} else {
self.finalized_checkpoint
self.finalized_snapshot
.ok_or_else(|| "Cannot build without a state".to_string())?
};
@ -420,7 +421,7 @@ where
.op_pool
.ok_or_else(|| "Cannot build without op pool".to_string())?,
eth1_chain: self.eth1_chain,
canonical_head: TimeoutRwLock::new(canonical_head),
canonical_head: TimeoutRwLock::new(canonical_head.clone()),
genesis_block_root: self
.genesis_block_root
.ok_or_else(|| "Cannot build without a genesis block root".to_string())?,
@ -431,6 +432,10 @@ where
.event_handler
.ok_or_else(|| "Cannot build without an event handler".to_string())?,
head_tracker: self.head_tracker.unwrap_or_default(),
block_processing_cache: TimeoutRwLock::new(SnapshotCache::new(
DEFAULT_SNAPSHOT_CACHE_SIZE,
canonical_head,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
log: log.clone(),
@ -482,30 +487,30 @@ where
ForkChoice::from_ssz_container(persisted)
.map_err(|e| format!("Unable to read persisted fork choice from disk: {:?}", e))?
} else {
let finalized_checkpoint = &self
.finalized_checkpoint
let finalized_snapshot = &self
.finalized_snapshot
.as_ref()
.ok_or_else(|| "fork_choice_backend requires a finalized_checkpoint")?;
.ok_or_else(|| "fork_choice_backend requires a finalized_snapshot")?;
let genesis_block_root = self
.genesis_block_root
.ok_or_else(|| "fork_choice_backend requires a genesis_block_root")?;
let backend = ProtoArrayForkChoice::new(
finalized_checkpoint.beacon_block.message.slot,
finalized_checkpoint.beacon_block.message.state_root,
finalized_snapshot.beacon_block.message.slot,
finalized_snapshot.beacon_block.message.state_root,
// Note: here we set the `justified_epoch` to be the same as the epoch of the
// finalized checkpoint. Whilst this finalized checkpoint may actually point to
// a _later_ justified checkpoint, that checkpoint won't yet exist in the fork
// choice.
finalized_checkpoint.beacon_state.current_epoch(),
finalized_checkpoint.beacon_state.current_epoch(),
finalized_checkpoint.beacon_block_root,
finalized_snapshot.beacon_state.current_epoch(),
finalized_snapshot.beacon_state.current_epoch(),
finalized_snapshot.beacon_block_root,
)?;
ForkChoice::new(
backend,
genesis_block_root,
&finalized_checkpoint.beacon_state,
&finalized_snapshot.beacon_state,
)
};
@ -576,7 +581,7 @@ where
/// Requires the state to be initialized.
pub fn testing_slot_clock(self, slot_duration: Duration) -> Result<Self, String> {
let genesis_time = self
.finalized_checkpoint
.finalized_snapshot
.as_ref()
.ok_or_else(|| "testing_slot_clock requires an initialized state")?
.beacon_state

View File

@ -306,10 +306,7 @@ impl CheckpointManager {
.ok_or_else(|| Error::UnknownJustifiedBlock(block_root))?;
let state = chain
.get_state_caching_only_with_committee_caches(
&block.state_root(),
Some(block.slot()),
)?
.get_state(&block.state_root(), Some(block.slot()))?
.ok_or_else(|| Error::UnknownJustifiedState(block.state_root()))?;
Ok(get_effective_balances(&state))

View File

@ -3,8 +3,8 @@
extern crate lazy_static;
mod beacon_chain;
mod beacon_snapshot;
pub mod builder;
mod checkpoint;
mod errors;
pub mod eth1_chain;
pub mod events;
@ -13,6 +13,7 @@ mod head_tracker;
mod metrics;
mod persisted_beacon_chain;
mod shuffling_cache;
mod snapshot_cache;
pub mod test_utils;
mod timeout_rw_lock;
mod validator_pubkey_cache;
@ -21,7 +22,7 @@ pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome,
StateSkipConfig,
};
pub use self::checkpoint::CheckPoint;
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use eth1_chain::{Eth1Chain, Eth1ChainBackend};
pub use events::EventHandler;

View File

@ -0,0 +1,217 @@
use crate::BeaconSnapshot;
use std::cmp;
use types::{Epoch, EthSpec, Hash256};
/// The default size of the cache.
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
/// Provides a cache of `BeaconSnapshot` that is intended primarily for block processing.
///
/// ## Cache Queuing
///
/// The cache has a non-standard queue mechanism (specifically, it is not LRU).
///
/// The cache has a max number of elements (`max_len`). Until `max_len` is achieved, all snapshots
/// are simply added to the queue. Once `max_len` is achieved, adding a new snapshot will cause an
/// existing snapshot to be ejected. The ejected snapshot will:
///
/// - Never be the `head_block_root`.
/// - Be the snapshot with the lowest `state.slot` (ties broken arbitrarily).
pub struct SnapshotCache<T: EthSpec> {
max_len: usize,
head_block_root: Hash256,
snapshots: Vec<BeaconSnapshot<T>>,
}
impl<T: EthSpec> SnapshotCache<T> {
/// Instantiate a new cache which contains the `head` snapshot.
///
/// Setting `max_len = 0` is equivalent to setting `max_len = 1`.
pub fn new(max_len: usize, head: BeaconSnapshot<T>) -> Self {
Self {
max_len: cmp::max(max_len, 1),
head_block_root: head.beacon_block_root,
snapshots: vec![head],
}
}
/// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see
/// struct-level documentation for more info).
pub fn insert(&mut self, snapshot: BeaconSnapshot<T>) {
if self.snapshots.len() < self.max_len {
self.snapshots.push(snapshot);
} else {
let insert_at = self
.snapshots
.iter()
.enumerate()
.filter_map(|(i, snapshot)| {
if snapshot.beacon_block_root != self.head_block_root {
Some((i, snapshot.beacon_state.slot))
} else {
None
}
})
.min_by_key(|(_i, slot)| *slot)
.map(|(i, _slot)| i);
if let Some(i) = insert_at {
self.snapshots[i] = snapshot;
}
}
}
/// If there is a snapshot with `block_root`, remove and return it.
pub fn try_remove(&mut self, block_root: Hash256) -> Option<BeaconSnapshot<T>> {
self.snapshots
.iter()
.position(|snapshot| snapshot.beacon_block_root == block_root)
.map(|i| self.snapshots.remove(i))
}
/// If there is a snapshot with `block_root`, clone it (with only the committee caches) and
/// return the clone.
pub fn get_cloned(&self, block_root: Hash256) -> Option<BeaconSnapshot<T>> {
self.snapshots
.iter()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| snapshot.clone_with_only_committee_caches())
}
/// Removes all snapshots from the queue that are less than or equal to the finalized epoch.
pub fn prune(&mut self, finalized_epoch: Epoch) {
self.snapshots.retain(|snapshot| {
snapshot.beacon_state.slot > finalized_epoch.start_slot(T::slots_per_epoch())
})
}
/// Inform the cache that the head of the beacon chain has changed.
///
/// The snapshot that matches this `head_block_root` will never be ejected from the cache
/// during `Self::insert`.
pub fn update_head(&mut self, head_block_root: Hash256) {
self.head_block_root = head_block_root
}
}
#[cfg(test)]
mod test {
use super::*;
use types::{
test_utils::{generate_deterministic_keypair, TestingBeaconStateBuilder},
BeaconBlock, Epoch, MainnetEthSpec, Signature, SignedBeaconBlock, Slot,
};
const CACHE_SIZE: usize = 4;
fn get_snapshot(i: u64) -> BeaconSnapshot<MainnetEthSpec> {
let spec = MainnetEthSpec::default_spec();
let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(1, &spec);
let (beacon_state, _keypairs) = state_builder.build();
BeaconSnapshot {
beacon_state,
beacon_state_root: Hash256::from_low_u64_be(i),
beacon_block: SignedBeaconBlock {
message: BeaconBlock::empty(&spec),
signature: Signature::new(&[42], &generate_deterministic_keypair(0).sk),
},
beacon_block_root: Hash256::from_low_u64_be(i),
}
}
#[test]
fn insert_get_prune_update() {
let mut cache = SnapshotCache::new(CACHE_SIZE, get_snapshot(0));
// Insert a bunch of entries in the cache. It should look like this:
//
// Index Root
// 0 0 <--head
// 1 1
// 2 2
// 3 3
for i in 1..CACHE_SIZE as u64 {
let mut snapshot = get_snapshot(i);
// Each snapshot should be one slot into an epoch, with each snapshot one epoch apart.
snapshot.beacon_state.slot = Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1);
cache.insert(snapshot);
assert_eq!(
cache.snapshots.len(),
i as usize + 1,
"cache length should be as expected"
);
assert_eq!(cache.head_block_root, Hash256::from_low_u64_be(0));
}
// Insert a new value in the cache. Afterwards it should look like:
//
// Index Root
// 0 0 <--head
// 1 42
// 2 2
// 3 3
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
cache.insert(get_snapshot(42));
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
assert!(
cache.try_remove(Hash256::from_low_u64_be(1)).is_none(),
"the snapshot with the lowest slot should have been removed during the insert function"
);
assert!(cache.get_cloned(Hash256::from_low_u64_be(1)).is_none());
assert!(
cache
.get_cloned(Hash256::from_low_u64_be(0))
.expect("the head should still be in the cache")
.beacon_block_root
== Hash256::from_low_u64_be(0),
"get_cloned should get the correct snapshot"
);
assert!(
cache
.try_remove(Hash256::from_low_u64_be(0))
.expect("the head should still be in the cache")
.beacon_block_root
== Hash256::from_low_u64_be(0),
"try_remove should get the correct snapshot"
);
assert_eq!(
cache.snapshots.len(),
CACHE_SIZE - 1,
"try_remove should shorten the cache"
);
// Prune the cache. Afterwards it should look like:
//
// Index Root
// 0 2
// 1 3
cache.prune(Epoch::new(2));
assert_eq!(cache.snapshots.len(), 2);
cache.update_head(Hash256::from_low_u64_be(2));
// Over-fill the cache so it needs to eject some old values on insert.
for i in 0..CACHE_SIZE as u64 {
cache.insert(get_snapshot(u64::max_value() - i));
}
// Ensure that the new head value was not removed from the cache.
assert!(
cache
.try_remove(Hash256::from_low_u64_be(2))
.expect("the new head should still be in the cache")
.beacon_block_root
== Hash256::from_low_u64_be(2),
"try_remove should get the correct snapshot"
);
}
}

View File

@ -731,7 +731,7 @@ where
.ok_or_else(|| "system_time_slot_clock requires a beacon_chain_builder")?;
let genesis_time = beacon_chain_builder
.finalized_checkpoint
.finalized_snapshot
.as_ref()
.ok_or_else(|| "system_time_slot_clock requires an initialized beacon state")?
.beacon_state

View File

@ -22,7 +22,6 @@ use std::convert::TryInto;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use types::beacon_state::CloneConfig;
use types::*;
/// 32-byte key for accessing the `split` of the freezer DB.
@ -47,8 +46,6 @@ pub struct HotColdDB<E: EthSpec> {
pub(crate) hot_db: LevelDB<E>,
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
/// LRU cache of deserialized states. Updated whenever a state is loaded.
state_cache: Mutex<LruCache<Hash256, BeaconState<E>>>,
/// Chain spec.
spec: ChainSpec,
/// Logger.
@ -145,7 +142,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error> {
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
if state.slot < self.get_split_slot() {
self.store_cold_state(state_root, &state)
} else {
@ -159,7 +156,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
state_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<BeaconState<E>>, Error> {
self.get_state_with(state_root, slot, CloneConfig::all())
self.get_state_with(state_root, slot)
}
/// Get a state from the store.
@ -169,7 +166,6 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
&self,
state_root: &Hash256,
slot: Option<Slot>,
clone_config: CloneConfig,
) -> Result<Option<BeaconState<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
@ -177,10 +173,10 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
if slot < self.get_split_slot() {
self.load_cold_state_by_slot(slot).map(Some)
} else {
self.load_hot_state(state_root, clone_config)
self.load_hot_state(state_root)
}
} else {
match self.load_hot_state(state_root, clone_config)? {
match self.load_hot_state(state_root)? {
Some(state) => Ok(Some(state)),
None => self.load_cold_state(state_root),
}
@ -204,9 +200,6 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?;
}
// Delete from the cache.
self.state_cache.lock().pop(state_root);
Ok(())
}
@ -309,10 +302,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
{
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
let state = self
.load_hot_state(
&epoch_boundary_state_root,
CloneConfig::committee_caches_only(),
)?
.load_hot_state(&epoch_boundary_state_root)?
.ok_or_else(|| {
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root)
})?;
@ -349,7 +339,6 @@ impl<E: EthSpec> HotColdDB<E> {
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
state_cache: Mutex::new(LruCache::new(config.state_cache_size)),
config,
spec,
log,
@ -371,7 +360,7 @@ impl<E: EthSpec> HotColdDB<E> {
pub fn store_hot_state(
&self,
state_root: &Hash256,
state: BeaconState<E>,
state: &BeaconState<E>,
) -> Result<(), Error> {
// On the epoch boundary, store the full state.
if state.slot % E::slots_per_epoch() == 0 {
@ -387,10 +376,7 @@ impl<E: EthSpec> HotColdDB<E> {
// Store a summary of the state.
// We store one even for the epoch boundary states, as we may need their slots
// when doing a look up by state root.
self.put_state_summary(state_root, HotStateSummary::new(state_root, &state)?)?;
// Store the state in the cache.
self.state_cache.lock().put(*state_root, state);
self.put_state_summary(state_root, HotStateSummary::new(state_root, state)?)?;
Ok(())
}
@ -398,24 +384,9 @@ impl<E: EthSpec> HotColdDB<E> {
/// Load a post-finalization state from the hot database.
///
/// Will replay blocks from the nearest epoch boundary.
pub fn load_hot_state(
&self,
state_root: &Hash256,
clone_config: CloneConfig,
) -> Result<Option<BeaconState<E>>, Error> {
pub fn load_hot_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
// Check the cache.
if let Some(state) = self.state_cache.lock().get(state_root) {
metrics::inc_counter(&metrics::BEACON_STATE_CACHE_HIT_COUNT);
let timer = metrics::start_timer(&metrics::BEACON_STATE_CACHE_CLONE_TIME);
let state = state.clone_with(clone_config);
metrics::stop_timer(timer);
return Ok(Some(state));
}
if let Some(HotStateSummary {
slot,
latest_block_root,
@ -439,9 +410,6 @@ impl<E: EthSpec> HotColdDB<E> {
self.replay_blocks(boundary_state, blocks, slot)?
};
// Update the LRU cache.
self.state_cache.lock().put(*state_root, state.clone());
Ok(Some(state))
} else {
Ok(None)

View File

@ -345,7 +345,7 @@ mod test {
let state_a_root = hashes.next().unwrap();
state_b.state_roots[0] = state_a_root;
store.put_state(&state_a_root, state_a).unwrap();
store.put_state(&state_a_root, &state_a).unwrap();
let iter = BlockRootsIterator::new(store, &state_b);
@ -393,8 +393,8 @@ mod test {
let state_a_root = Hash256::from_low_u64_be(slots_per_historical_root as u64);
let state_b_root = Hash256::from_low_u64_be(slots_per_historical_root as u64 * 2);
store.put_state(&state_a_root, state_a).unwrap();
store.put_state(&state_b_root, state_b.clone()).unwrap();
store.put_state(&state_a_root, &state_a).unwrap();
store.put_state(&state_b_root, &state_b.clone()).unwrap();
let iter = StateRootsIterator::new(store, &state_b);

View File

@ -123,7 +123,7 @@ impl<E: EthSpec> Store<E> for LevelDB<E> {
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error> {
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
store_full_state(self, state_root, &state)
}

View File

@ -38,7 +38,6 @@ pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metrics::scrape_for_metrics;
pub use state_batch::StateBatch;
pub use types::beacon_state::CloneConfig;
pub use types::*;
/// An object capable of storing and retrieving objects implementing `StoreItem`.
@ -97,7 +96,7 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error>;
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error>;
/// Store a state summary in the store.
// NOTE: this is a hack for the HotColdDb, we could consider splitting this
@ -122,7 +121,6 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
&self,
state_root: &Hash256,
slot: Option<Slot>,
_clone_config: CloneConfig,
) -> Result<Option<BeaconState<E>>, Error> {
// Default impl ignores config. Overriden in `HotColdDb`.
self.get_state(state_root, slot)

View File

@ -76,7 +76,7 @@ impl<E: EthSpec> Store<E> for MemoryStore<E> {
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error> {
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
store_full_state(self, state_root, &state)
}

View File

@ -38,7 +38,7 @@ impl<E: EthSpec> StateBatch<E> {
/// May fail to write the full batch if any of the items error (i.e. not atomic!)
pub fn commit<S: Store<E>>(self, store: &S) -> Result<(), Error> {
self.items.into_iter().try_for_each(|item| match item {
BatchItem::Full(state_root, state) => store.put_state(&state_root, state),
BatchItem::Full(state_root, state) => store.put_state(&state_root, &state),
BatchItem::Summary(state_root, summary) => {
store.put_state_summary(&state_root, summary)
}