Add LRU cache to database (#837)

* Add LRU caches to store

* Improvements to LRU caches

* Take state by value in `Store::put_state`

* Store blocks by value, configurable cache sizes

* Use a StateBatch to efficiently store skip states

* Fix store tests

* Add CloneConfig test, remove unused metrics

* Use Mutexes instead of RwLocks for LRU caches
This commit is contained in:
Michael Sproul 2020-02-10 11:30:21 +11:00 committed by GitHub
parent c3182e3c1c
commit e0b9fa599f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 514 additions and 385 deletions

1
Cargo.lock generated
View File

@ -3968,6 +3968,7 @@ dependencies = [
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"leveldb 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)",
"lighthouse_metrics 0.1.0",
"lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,5 +1,4 @@
use crate::checkpoint::CheckPoint;
use crate::checkpoint_cache::CheckPointCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::events::{EventHandler, EventKind};
@ -22,7 +21,6 @@ use state_processing::per_block_processing::{
use state_processing::{
per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy,
};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::fs;
use std::io::prelude::*;
@ -31,7 +29,7 @@ use std::time::{Duration, Instant};
use store::iter::{
BlockRootsIterator, ReverseBlockRootIterator, ReverseStateRootIterator, StateRootsIterator,
};
use store::{Error as DBError, Migrate, Store};
use store::{Error as DBError, Migrate, StateBatch, Store};
use tree_hash::TreeHash;
use types::*;
@ -149,8 +147,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub event_handler: T::EventHandler,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: HeadTracker,
/// Provides a small cache of `BeaconState` and `BeaconBlock`.
pub(crate) checkpoint_cache: CheckPointCache<T::EthSpec>,
/// Logging to CLI, etc.
pub(crate) log: Logger,
}
@ -168,11 +164,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let beacon_block_root = canonical_head.beacon_state.finalized_checkpoint.root;
let beacon_block = self
.store
.get::<BeaconBlock<_>>(&beacon_block_root)?
.get_block(&beacon_block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?;
let beacon_state_root = beacon_block.state_root;
let beacon_state = self
.get_state_caching(&beacon_state_root, Some(beacon_block.slot))?
.get_state(&beacon_state_root, Some(beacon_block.slot))?
.ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?;
CheckPoint {
@ -306,10 +302,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
) -> Result<ReverseBlockRootIterator<T::EthSpec, T::Store>, Error> {
let block = self
.get_block_caching(&block_root)?
.get_block(&block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(block_root))?;
let state = self
.get_state_caching(&block.state_root, Some(block.slot))?
.get_state(&block.state_root, Some(block.slot))?
.ok_or_else(|| Error::MissingBeaconState(block.state_root))?;
let iter = BlockRootsIterator::owned(self.store.clone(), state);
Ok(ReverseBlockRootIterator::new(
@ -392,7 +388,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
) -> Result<Option<BeaconBlock<T::EthSpec>>, Error> {
Ok(self.store.get(block_root)?)
Ok(self.store.get_block(block_root)?)
}
/// Returns the state at the given root, if any.
@ -408,44 +404,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.store.get_state(state_root, slot)?)
}
/// Returns the block at the given root, if any.
///
/// ## Errors
///
/// May return a database error.
pub fn get_block_caching(
&self,
block_root: &Hash256,
) -> Result<Option<BeaconBlock<T::EthSpec>>, Error> {
if let Some(block) = self.checkpoint_cache.get_block(block_root) {
Ok(Some(block))
} else {
Ok(self.store.get(block_root)?)
}
}
/// Returns the state at the given root, if any.
///
/// ## Errors
///
/// May return a database error.
pub fn get_state_caching(
&self,
state_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<BeaconState<T::EthSpec>>, Error> {
if let Some(state) = self.checkpoint_cache.get_state(state_root) {
Ok(Some(state))
} else {
Ok(self.store.get_state(state_root, slot)?)
}
}
/// Returns the state at the given root, if any.
///
/// The return state does not contain any caches other than the committee caches. This method
/// is much faster than `Self::get_state_caching` because it does not clone the tree hash cache
/// when the state is found in the checkpoint cache.
/// is much faster than `Self::get_state` because it does not clone the tree hash cache
/// when the state is found in the cache.
///
/// ## Errors
///
@ -455,14 +418,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
state_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<BeaconState<T::EthSpec>>, Error> {
if let Some(state) = self
.checkpoint_cache
.get_state_only_with_committee_cache(state_root)
{
Ok(Some(state))
} else {
Ok(self.store.get_state(state_root, slot)?)
}
Ok(self.store.get_state_with(
state_root,
slot,
types::beacon_state::CloneConfig::committee_caches_only(),
)?)
}
/// Returns a `Checkpoint` representing the head block and state. Contains the "best block";
@ -568,7 +528,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.ok_or_else(|| Error::NoStateForSlot(slot))?;
Ok(self
.get_state_caching(&state_root, Some(slot))?
.get_state(&state_root, Some(slot))?
.ok_or_else(|| Error::NoStateForSlot(slot))?)
}
}
@ -890,7 +850,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// An honest validator would have set this block to be the head of the chain (i.e., the
// result of running fork choice).
let result = if let Some(attestation_head_block) =
self.get_block_caching(&attestation.data.beacon_block_root)?
self.get_block(&attestation.data.beacon_block_root)?
{
// If the attestation points to a block in the same epoch in which it was made,
// then it is sufficient to load the state from that epoch's boundary, because
@ -1274,8 +1234,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Load the blocks parent block from the database, returning invalid if that block is not
// found.
let parent_block: BeaconBlock<T::EthSpec> =
match self.get_block_caching(&block.parent_root)? {
let parent_block = match self.get_block(&block.parent_root)? {
Some(block) => block,
None => {
return Ok(BlockProcessingOutcome::ParentUnknown {
@ -1289,7 +1248,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// It is an error because if we know the parent block we should also know the parent state.
let parent_state_root = parent_block.state_root;
let parent_state = self
.get_state_caching(&parent_state_root, Some(parent_block.slot))?
.get_state(&parent_state_root, Some(parent_block.slot))?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing state {:?}", parent_state_root))
})?;
@ -1300,25 +1259,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE);
// Keep a list of any states that were "skipped" (block-less) in between the parent state
// slot and the block slot. These will need to be stored in the database.
let mut intermediate_states = vec![];
// Keep a batch of any states that were "skipped" (block-less) in between the parent state
// slot and the block slot. These will be stored in the database.
let mut intermediate_states = StateBatch::new();
// Transition the parent state to the block slot.
let mut state: BeaconState<T::EthSpec> = parent_state;
let distance = block.slot.as_u64().saturating_sub(state.slot.as_u64());
for i in 0..distance {
if i > 0 {
intermediate_states.push(state.clone());
}
let state_root = if i == 0 {
Some(parent_block.state_root)
parent_block.state_root
} else {
None
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;
intermediate_states.add_state(state_root, &state)?;
state_root
};
per_slot_processing(&mut state, state_root, &self.spec)?;
per_slot_processing(&mut state, Some(state_root), &self.spec)?;
}
metrics::stop_timer(catchup_timer);
@ -1393,23 +1353,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::stop_timer(fork_choice_register_timer);
self.head_tracker.register_block(block_root, &block);
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
block.body.attestations.len() as f64,
);
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
// Store all the states between the parent block state and this blocks slot before storing
// Store all the states between the parent block state and this block's slot before storing
// the final state.
for (i, intermediate_state) in intermediate_states.iter().enumerate() {
// To avoid doing an unnecessary tree hash, use the following (slot + 1) state's
// state_roots field to find the root.
let following_state = match intermediate_states.get(i + 1) {
Some(following_state) => following_state,
None => &state,
};
let intermediate_state_root =
following_state.get_state_root(intermediate_state.slot)?;
self.store
.put_state(&intermediate_state_root, intermediate_state)?;
}
intermediate_states.commit(&*self.store)?;
// Store the block and state.
// NOTE: we store the block *after* the state to guard against inconsistency in the event of
@ -1417,29 +1371,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// solution would be to use a database transaction (once our choice of database and API
// settles down).
// See: https://github.com/sigp/lighthouse/issues/692
self.store.put_state(&state_root, &state)?;
self.store.put(&block_root, &block)?;
self.store.put_state(&state_root, state)?;
self.store.put_block(&block_root, block)?;
metrics::stop_timer(db_write_timer);
self.head_tracker.register_block(block_root, &block);
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
block.body.attestations.len() as f64,
);
// Store the block in the checkpoint cache.
//
// A block that was just imported is likely to be referenced by the next block that we
// import.
self.checkpoint_cache.insert(Cow::Owned(CheckPoint {
beacon_block_root: block_root,
beacon_block: block,
beacon_state_root: state_root,
beacon_state: state,
}));
metrics::stop_timer(full_timer);
@ -1575,13 +1512,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let result = if beacon_block_root != self.head_info()?.block_root {
metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD);
let beacon_block: BeaconBlock<T::EthSpec> = self
.get_block_caching(&beacon_block_root)?
let beacon_block = self
.get_block(&beacon_block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?;
let beacon_state_root = beacon_block.state_root;
let beacon_state: BeaconState<T::EthSpec> = self
.get_state_caching(&beacon_state_root, Some(beacon_block.slot))?
.get_state(&beacon_state_root, Some(beacon_block.slot))?
.ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?;
let previous_slot = self.head_info()?.slot;
@ -1650,11 +1587,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);
// Store the head in the checkpoint cache.
//
// The head block is likely to be referenced by the next imported block.
self.checkpoint_cache.insert(Cow::Borrowed(&new_head));
// Update the checkpoint that stores the head of the chain at the time it received the
// block.
*self
@ -1703,7 +1635,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<(), Error> {
let finalized_block = self
.store
.get::<BeaconBlock<T::EthSpec>>(&finalized_block_root)?
.get_block(&finalized_block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(finalized_block_root))?;
let new_finalized_epoch = finalized_block.slot.epoch(T::EthSpec::slots_per_epoch());

View File

@ -1,4 +1,3 @@
use crate::checkpoint_cache::CheckPointCache;
use crate::eth1_chain::CachingEth1Backend;
use crate::events::NullEventHandler;
use crate::head_tracker::HeadTracker;
@ -219,7 +218,7 @@ where
self.genesis_block_root = Some(beacon_block_root);
store
.put_state(&beacon_state_root, &beacon_state)
.put_state(&beacon_state_root, beacon_state.clone())
.map_err(|e| format!("Failed to store genesis state: {:?}", e))?;
store
.put(&beacon_block_root, &beacon_block)
@ -334,7 +333,6 @@ where
.event_handler
.ok_or_else(|| "Cannot build without an event handler".to_string())?,
head_tracker: self.head_tracker.unwrap_or_default(),
checkpoint_cache: CheckPointCache::default(),
log: log.clone(),
};

View File

@ -1,125 +0,0 @@
use crate::checkpoint::CheckPoint;
use crate::metrics;
use parking_lot::RwLock;
use std::borrow::Cow;
use types::{BeaconBlock, BeaconState, EthSpec, Hash256};
const CACHE_SIZE: usize = 4;
struct Inner<T: EthSpec> {
oldest: usize,
limit: usize,
checkpoints: Vec<CheckPoint<T>>,
}
impl<T: EthSpec> Default for Inner<T> {
fn default() -> Self {
Self {
oldest: 0,
limit: CACHE_SIZE,
checkpoints: vec![],
}
}
}
pub struct CheckPointCache<T: EthSpec> {
inner: RwLock<Inner<T>>,
}
impl<T: EthSpec> Default for CheckPointCache<T> {
fn default() -> Self {
Self {
inner: RwLock::new(Inner::default()),
}
}
}
impl<T: EthSpec> CheckPointCache<T> {
pub fn insert(&self, checkpoint: Cow<CheckPoint<T>>) {
if self
.inner
.read()
.checkpoints
.iter()
// This is `O(n)` but whilst `n == 4` it ain't no thing.
.any(|local| local.beacon_state_root == checkpoint.beacon_state_root)
{
// Adding a known checkpoint to the cache should be a no-op.
return;
}
let mut inner = self.inner.write();
if inner.checkpoints.len() < inner.limit {
inner.checkpoints.push(checkpoint.into_owned())
} else {
let i = inner.oldest; // to satisfy the borrow checker.
inner.checkpoints[i] = checkpoint.into_owned();
inner.oldest += 1;
inner.oldest %= inner.limit;
}
}
pub fn get_state(&self, state_root: &Hash256) -> Option<BeaconState<T>> {
self.inner
.read()
.checkpoints
.iter()
// Also `O(n)`.
.find(|checkpoint| checkpoint.beacon_state_root == *state_root)
.map(|checkpoint| {
metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS);
checkpoint.beacon_state.clone()
})
.or_else(|| {
metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES);
None
})
}
pub fn get_state_only_with_committee_cache(
&self,
state_root: &Hash256,
) -> Option<BeaconState<T>> {
self.inner
.read()
.checkpoints
.iter()
// Also `O(n)`.
.find(|checkpoint| checkpoint.beacon_state_root == *state_root)
.map(|checkpoint| {
metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS);
let mut state = checkpoint.beacon_state.clone_without_caches();
state.committee_caches = checkpoint.beacon_state.committee_caches.clone();
state
})
.or_else(|| {
metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES);
None
})
}
pub fn get_block(&self, block_root: &Hash256) -> Option<BeaconBlock<T>> {
self.inner
.read()
.checkpoints
.iter()
// Also `O(n)`.
.find(|checkpoint| checkpoint.beacon_block_root == *block_root)
.map(|checkpoint| {
metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS);
checkpoint.beacon_block.clone()
})
.or_else(|| {
metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES);
None
})
}
}

View File

@ -883,7 +883,7 @@ mod test {
&state
.get_state_root(prev_state.slot)
.expect("should find state root"),
&prev_state,
prev_state,
)
.expect("should store state");
@ -953,7 +953,7 @@ mod test {
&state
.get_state_root(Slot::new(0))
.expect("should find state root"),
&prev_state,
prev_state,
)
.expect("should store state");

View File

@ -302,7 +302,7 @@ impl CheckpointManager {
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);
let block = chain
.get_block_caching(&block_root)?
.get_block(&block_root)?
.ok_or_else(|| Error::UnknownJustifiedBlock(block_root))?;
let state = chain

View File

@ -5,7 +5,6 @@ extern crate lazy_static;
mod beacon_chain;
pub mod builder;
mod checkpoint;
mod checkpoint_cache;
mod errors;
pub mod eth1_chain;
pub mod events;

View File

@ -149,14 +149,6 @@ lazy_static! {
pub static ref PERSIST_CHAIN: Result<Histogram> =
try_create_histogram("beacon_persist_chain", "Time taken to update the canonical head");
/*
* Checkpoint cache
*/
pub static ref CHECKPOINT_CACHE_HITS: Result<IntCounter> =
try_create_int_counter("beacon_checkpoint_cache_hits_total", "Count of times checkpoint cache fulfils request");
pub static ref CHECKPOINT_CACHE_MISSES: Result<IntCounter> =
try_create_int_counter("beacon_checkpoint_cache_misses_total", "Count of times checkpoint cache fulfils request");
/*
* Eth1
*/

View File

@ -9,7 +9,7 @@ use beacon_chain::{
};
use sloggers::{null::NullLoggerBuilder, Build};
use std::sync::Arc;
use store::DiskStore;
use store::{DiskStore, StoreConfig};
use tempfile::{tempdir, TempDir};
use types::{EthSpec, Keypair, MinimalEthSpec};
@ -27,10 +27,10 @@ fn get_store(db_path: &TempDir) -> Arc<DiskStore<E>> {
let spec = E::default_spec();
let hot_path = db_path.path().join("hot_db");
let cold_path = db_path.path().join("cold_db");
let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64;
let config = StoreConfig::default();
let log = NullLoggerBuilder.build().expect("logger should build");
Arc::new(
DiskStore::open(&hot_path, &cold_path, slots_per_restore_point, spec, log)
DiskStore::open(&hot_path, &cold_path, config, spec, log)
.expect("disk store should initialize"),
)
}

View File

@ -10,7 +10,7 @@ use beacon_chain::AttestationProcessingOutcome;
use rand::Rng;
use sloggers::{null::NullLoggerBuilder, Build};
use std::sync::Arc;
use store::{DiskStore, Store};
use store::{DiskStore, Store, StoreConfig};
use tempfile::{tempdir, TempDir};
use tree_hash::TreeHash;
use types::test_utils::{SeedableRng, XorShiftRng};
@ -31,10 +31,10 @@ fn get_store(db_path: &TempDir) -> Arc<DiskStore<E>> {
let spec = MinimalEthSpec::default_spec();
let hot_path = db_path.path().join("hot_db");
let cold_path = db_path.path().join("cold_db");
let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64;
let config = StoreConfig::default();
let log = NullLoggerBuilder.build().expect("logger should build");
Arc::new(
DiskStore::open(&hot_path, &cold_path, slots_per_restore_point, spec, log)
DiskStore::open(&hot_path, &cold_path, config, spec, log)
.expect("disk store should initialize"),
)
}

View File

@ -7,7 +7,7 @@ use beacon_chain::{
slot_clock::{SlotClock, SystemTimeSlotClock},
store::{
migrate::{BackgroundMigrator, Migrate, NullMigrator},
DiskStore, MemoryStore, SimpleDiskStore, Store,
DiskStore, MemoryStore, SimpleDiskStore, Store, StoreConfig,
},
BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler,
};
@ -478,7 +478,7 @@ where
mut self,
hot_path: &Path,
cold_path: &Path,
slots_per_restore_point: u64,
config: StoreConfig,
) -> Result<Self, String> {
let context = self
.runtime_context
@ -490,13 +490,7 @@ where
.clone()
.ok_or_else(|| "disk_store requires a chain spec".to_string())?;
let store = DiskStore::open(
hot_path,
cold_path,
slots_per_restore_point,
spec,
context.log,
)
let store = DiskStore::open(hot_path, cold_path, config, spec, context.log)
.map_err(|e| format!("Unable to open database: {:?}", e))?;
self.store = Some(Arc::new(store));
Ok(self)

View File

@ -6,6 +6,9 @@ use std::path::PathBuf;
/// The number initial validators when starting the `Minimal`.
const TESTNET_SPEC_CONSTANTS: &str = "minimal";
/// Default directory name for the freezer database under the top-level data dir.
const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db";
/// Defines how the client should initialize the `BeaconChain` and other components.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClientGenesis {
@ -41,6 +44,10 @@ impl Default for ClientGenesis {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub data_dir: PathBuf,
/// Name of the directory inside the data directory where the main "hot" DB is located.
pub db_name: String,
/// Path where the freezer database will be located.
pub freezer_db_path: Option<PathBuf>,
pub testnet_dir: Option<PathBuf>,
pub log_file: PathBuf,
pub spec_constants: String,
@ -64,6 +71,8 @@ impl Default for Config {
fn default() -> Self {
Self {
data_dir: PathBuf::from(".lighthouse"),
db_name: "chain_db".to_string(),
freezer_db_path: None,
testnet_dir: None,
log_file: PathBuf::from(""),
genesis: <_>::default(),
@ -83,7 +92,7 @@ impl Config {
/// Get the database path without initialising it.
pub fn get_db_path(&self) -> Option<PathBuf> {
self.get_data_dir()
.map(|data_dir| data_dir.join(&self.store.db_name))
.map(|data_dir| data_dir.join(&self.db_name))
}
/// Get the database path, creating it if necessary.
@ -97,7 +106,7 @@ impl Config {
/// Fetch default path to use for the freezer database.
fn default_freezer_db_path(&self) -> Option<PathBuf> {
self.get_data_dir()
.map(|data_dir| data_dir.join(self.store.default_freezer_db_dir()))
.map(|data_dir| data_dir.join(DEFAULT_FREEZER_DB_DIR))
}
/// Returns the path to which the client may initialize the on-disk freezer database.
@ -105,8 +114,7 @@ impl Config {
/// Will attempt to use the user-supplied path from e.g. the CLI, or will default
/// to a directory in the data_dir if no path is provided.
pub fn get_freezer_db_path(&self) -> Option<PathBuf> {
self.store
.freezer_db_path
self.freezer_db_path
.clone()
.or_else(|| self.default_freezer_db_path())
}

View File

@ -199,6 +199,20 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
DO NOT DECREASE AFTER INITIALIZATION. [default: 2048 (mainnet) or 64 (minimal)]")
.takes_value(true)
)
.arg(
Arg::with_name("block-cache-size")
.long("block-cache-size")
.value_name("SIZE")
.help("Specifies how many blocks the database should cache in memory [default: 5]")
.takes_value(true)
)
.arg(
Arg::with_name("state-cache-size")
.long("state-cache-size")
.value_name("SIZE")
.help("Specifies how many states the database should cache in memory [default: 5]")
.takes_value(true)
)
/*
* The "testnet" sub-command.
*

View File

@ -252,7 +252,7 @@ pub fn get_configs<E: EthSpec>(
};
if let Some(freezer_dir) = cli_args.value_of("freezer-dir") {
client_config.store.freezer_db_path = Some(PathBuf::from(freezer_dir));
client_config.freezer_db_path = Some(PathBuf::from(freezer_dir));
}
if let Some(slots_per_restore_point) = cli_args.value_of("slots-per-restore-point") {
@ -266,6 +266,18 @@ pub fn get_configs<E: EthSpec>(
);
}
if let Some(block_cache_size) = cli_args.value_of("block-cache-size") {
client_config.store.block_cache_size = block_cache_size
.parse()
.map_err(|_| "block-cache-size is not a valid integer".to_string())?;
}
if let Some(state_cache_size) = cli_args.value_of("state-cache-size") {
client_config.store.state_cache_size = state_cache_size
.parse()
.map_err(|_| "block-cache-size is not a valid integer".to_string())?;
}
if eth2_config.spec_constants != client_config.spec_constants {
crit!(log, "Specification constants do not match.";
"client_config" => client_config.spec_constants.to_string(),

View File

@ -90,11 +90,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
Ok(ClientBuilder::new(context.eth_spec_instance.clone())
.runtime_context(context)
.chain_spec(spec)
.disk_store(
&db_path,
&freezer_db_path_res?,
store_config.slots_per_restore_point,
)?
.disk_store(&db_path, &freezer_db_path_res?, store_config)?
.background_migrator()?)
})
.and_then(move |builder| {

View File

@ -29,3 +29,4 @@ serde = "1.0"
serde_derive = "1.0.102"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
lru = "0.4.3"

View File

@ -1,36 +1,28 @@
use serde_derive::{Deserialize, Serialize};
use std::path::PathBuf;
use types::{EthSpec, MinimalEthSpec};
/// Default directory name for the freezer database under the top-level data dir.
const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db";
/// Default value for the freezer DB's restore point frequency.
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;
pub const DEFAULT_STATE_CACHE_SIZE: usize = 5;
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Database configuration parameters.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StoreConfig {
/// Name of the directory inside the data directory where the main "hot" DB is located.
pub db_name: String,
/// Path where the freezer database will be located.
pub freezer_db_path: Option<PathBuf>,
/// Number of slots to wait between storing restore points in the freezer database.
pub slots_per_restore_point: u64,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: usize,
/// Maximum number of states to store in the in-memory state cache.
pub state_cache_size: usize,
}
impl Default for StoreConfig {
fn default() -> Self {
Self {
db_name: "chain_db".to_string(),
freezer_db_path: None,
// Safe default for tests, shouldn't ever be read by a CLI node.
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
}
}
}
impl StoreConfig {
pub fn default_freezer_db_dir(&self) -> &'static str {
DEFAULT_FREEZER_DB_DIR
}
}

View File

@ -1,12 +1,16 @@
use crate::chunked_vector::{
store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots,
};
use crate::config::StoreConfig;
use crate::forwards_iter::HybridForwardsBlockRootsIterator;
use crate::impls::beacon_state::store_full_state;
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::metrics;
use crate::{
leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem,
};
use parking_lot::RwLock;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use slog::{debug, trace, warn, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
@ -18,6 +22,7 @@ use std::convert::TryInto;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use types::beacon_state::CloneConfig;
use types::*;
/// 32-byte key for accessing the `split` of the freezer DB.
@ -33,14 +38,17 @@ pub struct HotColdDB<E: EthSpec> {
/// States with slots less than `split.slot` are in the cold DB, while states with slots
/// greater than or equal are in the hot DB.
split: RwLock<Split>,
/// Number of slots per restore point state in the freezer database.
slots_per_restore_point: u64,
config: StoreConfig,
/// Cold database containing compact historical data.
pub(crate) cold_db: LevelDB<E>,
/// Hot database containing duplicated but quick-to-access recent data.
///
/// The hot database also contains all blocks.
pub(crate) hot_db: LevelDB<E>,
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
block_cache: Mutex<LruCache<Hash256, BeaconBlock<E>>>,
/// LRU cache of deserialized states. Updated whenever a state is loaded.
state_cache: Mutex<LruCache<Hash256, BeaconState<E>>>,
/// Chain spec.
spec: ChainSpec,
/// Logger.
@ -98,10 +106,42 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
self.hot_db.key_delete(column, key)
}
/// Store a block and update the LRU cache.
fn put_block(&self, block_root: &Hash256, block: BeaconBlock<E>) -> Result<(), Error> {
// Store on disk.
self.put(block_root, &block)?;
// Update cache.
self.block_cache.lock().put(*block_root, block);
Ok(())
}
/// Fetch a block from the store.
fn get_block(&self, block_root: &Hash256) -> Result<Option<BeaconBlock<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
// Check the cache.
if let Some(block) = self.block_cache.lock().get(block_root) {
metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT);
return Ok(Some(block.clone()));
}
// Fetch from database.
match self.get::<BeaconBlock<E>>(block_root)? {
Some(block) => {
// Add to cache.
self.block_cache.lock().put(*block_root, block.clone());
Ok(Some(block))
}
None => Ok(None),
}
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error> {
if state.slot < self.get_split_slot() {
self.store_cold_state(state_root, state)
self.store_cold_state(state_root, &state)
} else {
self.store_hot_state(state_root, state)
}
@ -113,14 +153,28 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
state_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<BeaconState<E>>, Error> {
self.get_state_with(state_root, slot, CloneConfig::all())
}
/// Get a state from the store.
///
/// Fetch a state from the store, controlling which cache fields are cloned.
fn get_state_with(
&self,
state_root: &Hash256,
slot: Option<Slot>,
clone_config: CloneConfig,
) -> Result<Option<BeaconState<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
if let Some(slot) = slot {
if slot < self.get_split_slot() {
self.load_cold_state_by_slot(slot).map(Some)
} else {
self.load_hot_state(state_root)
self.load_hot_state(state_root, clone_config)
}
} else {
match self.load_hot_state(state_root)? {
match self.load_hot_state(state_root, clone_config)? {
Some(state) => Ok(Some(state)),
None => self.load_cold_state(state_root),
}
@ -164,7 +218,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
for (state_root, slot) in
state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot)
{
if slot % store.slots_per_restore_point == 0 {
if slot % store.config.slots_per_restore_point == 0 {
let state: BeaconState<E> = store
.hot_db
.get_state(&state_root, None)?
@ -229,9 +283,12 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
..
}) = self.load_hot_state_summary(state_root)?
{
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
let state = self
.hot_db
.get_state(&epoch_boundary_state_root, None)?
.load_hot_state(
&epoch_boundary_state_root,
CloneConfig::committee_caches_only(),
)?
.ok_or_else(|| {
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root)
})?;
@ -257,17 +314,19 @@ impl<E: EthSpec> HotColdDB<E> {
pub fn open(
hot_path: &Path,
cold_path: &Path,
slots_per_restore_point: u64,
config: StoreConfig,
spec: ChainSpec,
log: Logger,
) -> Result<Self, Error> {
Self::verify_slots_per_restore_point(slots_per_restore_point)?;
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
split: RwLock::new(Split::default()),
slots_per_restore_point,
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
state_cache: Mutex::new(LruCache::new(config.state_cache_size)),
config,
spec,
log,
_phantom: PhantomData,
@ -288,7 +347,7 @@ impl<E: EthSpec> HotColdDB<E> {
pub fn store_hot_state(
&self,
state_root: &Hash256,
state: &BeaconState<E>,
state: BeaconState<E>,
) -> Result<(), Error> {
// On the epoch boundary, store the full state.
if state.slot % E::slots_per_epoch() == 0 {
@ -298,13 +357,16 @@ impl<E: EthSpec> HotColdDB<E> {
"slot" => state.slot.as_u64(),
"state_root" => format!("{:?}", state_root)
);
self.hot_db.put_state(state_root, state)?;
store_full_state(&self.hot_db, state_root, &state)?;
}
// Store a summary of the state.
// We store one even for the epoch boundary states, as we may need their slots
// when doing a look up by state root.
self.store_hot_state_summary(state_root, state)?;
self.put_state_summary(state_root, HotStateSummary::new(state_root, &state)?)?;
// Store the state in the cache.
self.state_cache.lock().put(*state_root, state);
Ok(())
}
@ -312,14 +374,31 @@ impl<E: EthSpec> HotColdDB<E> {
/// Load a post-finalization state from the hot database.
///
/// Will replay blocks from the nearest epoch boundary.
pub fn load_hot_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
pub fn load_hot_state(
&self,
state_root: &Hash256,
clone_config: CloneConfig,
) -> Result<Option<BeaconState<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
// Check the cache.
if let Some(state) = self.state_cache.lock().get(state_root) {
metrics::inc_counter(&metrics::BEACON_STATE_CACHE_HIT_COUNT);
let timer = metrics::start_timer(&metrics::BEACON_STATE_CACHE_CLONE_TIME);
let state = state.clone_with(clone_config);
metrics::stop_timer(timer);
return Ok(Some(state));
}
if let Some(HotStateSummary {
slot,
latest_block_root,
epoch_boundary_state_root,
}) = self.load_hot_state_summary(state_root)?
{
let state: BeaconState<E> = self
let boundary_state = self
.hot_db
.get_state(&epoch_boundary_state_root, None)?
.ok_or_else(|| {
@ -328,12 +407,18 @@ impl<E: EthSpec> HotColdDB<E> {
// Optimization to avoid even *thinking* about replaying blocks if we're already
// on an epoch boundary.
if slot % E::slots_per_epoch() == 0 {
Ok(Some(state))
let state = if slot % E::slots_per_epoch() == 0 {
boundary_state
} else {
let blocks = self.load_blocks_to_replay(state.slot, slot, latest_block_root)?;
self.replay_blocks(state, blocks, slot).map(Some)
}
let blocks =
self.load_blocks_to_replay(boundary_state.slot, slot, latest_block_root)?;
self.replay_blocks(boundary_state, blocks, slot)?
};
// Update the LRU cache.
self.state_cache.lock().put(*state_root, state.clone());
Ok(Some(state))
} else {
Ok(None)
}
@ -348,7 +433,7 @@ impl<E: EthSpec> HotColdDB<E> {
state_root: &Hash256,
state: &BeaconState<E>,
) -> Result<(), Error> {
if state.slot % self.slots_per_restore_point != 0 {
if state.slot % self.config.slots_per_restore_point != 0 {
warn!(
self.log,
"Not storing non-restore point state in freezer";
@ -377,7 +462,7 @@ impl<E: EthSpec> HotColdDB<E> {
store_updated_vector(RandaoMixes, db, state, &self.spec)?;
// 3. Store restore point.
let restore_point_index = state.slot.as_u64() / self.slots_per_restore_point;
let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point;
self.store_restore_point_hash(restore_point_index, *state_root)?;
Ok(())
@ -397,8 +482,8 @@ impl<E: EthSpec> HotColdDB<E> {
///
/// Will reconstruct the state if it lies between restore points.
pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result<BeaconState<E>, Error> {
if slot % self.slots_per_restore_point == 0 {
let restore_point_idx = slot.as_u64() / self.slots_per_restore_point;
if slot % self.config.slots_per_restore_point == 0 {
let restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point;
self.load_restore_point_by_index(restore_point_idx)
} else {
self.load_cold_intermediate_state(slot)
@ -431,7 +516,7 @@ impl<E: EthSpec> HotColdDB<E> {
/// Load a frozen state that lies between restore points.
fn load_cold_intermediate_state(&self, slot: Slot) -> Result<BeaconState<E>, Error> {
// 1. Load the restore points either side of the intermediate state.
let low_restore_point_idx = slot.as_u64() / self.slots_per_restore_point;
let low_restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point;
let high_restore_point_idx = low_restore_point_idx + 1;
// Acquire the read lock, so that the split can't change while this is happening.
@ -440,7 +525,7 @@ impl<E: EthSpec> HotColdDB<E> {
let low_restore_point = self.load_restore_point_by_index(low_restore_point_idx)?;
// If the slot of the high point lies outside the freezer, use the split state
// as the upper restore point.
let high_restore_point = if high_restore_point_idx * self.slots_per_restore_point
let high_restore_point = if high_restore_point_idx * self.config.slots_per_restore_point
>= split.slot.as_u64()
{
self.get_state(&split.state_root, Some(split.slot))?
@ -553,7 +638,8 @@ impl<E: EthSpec> HotColdDB<E> {
/// Fetch the slot of the most recently stored restore point.
pub fn get_latest_restore_point_slot(&self) -> Slot {
(self.get_split_slot() - 1) / self.slots_per_restore_point * self.slots_per_restore_point
(self.get_split_slot() - 1) / self.config.slots_per_restore_point
* self.config.slots_per_restore_point
}
/// Load the split point from disk.
@ -615,33 +701,6 @@ impl<E: EthSpec> HotColdDB<E> {
HotStateSummary::db_get(&self.hot_db, state_root)
}
/// Store a summary of a hot database state.
fn store_hot_state_summary(
&self,
state_root: &Hash256,
state: &BeaconState<E>,
) -> Result<(), Error> {
// Fill in the state root on the latest block header if necessary (this happens on all
// slots where there isn't a skip).
let latest_block_root = state.get_latest_block_root(*state_root);
let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch();
let epoch_boundary_state_root = if epoch_boundary_slot == state.slot {
*state_root
} else {
*state
.get_state_root(epoch_boundary_slot)
.map_err(HotColdDBError::HotStateSummaryError)?
};
HotStateSummary {
slot: state.slot,
latest_block_root,
epoch_boundary_state_root,
}
.db_put(&self.hot_db, state_root)
.map_err(Into::into)
}
/// Check that the restore point frequency is valid.
///
/// Specifically, check that it is:
@ -718,6 +777,29 @@ impl SimpleStoreItem for HotStateSummary {
}
}
impl HotStateSummary {
/// Construct a new summary of the given state.
pub fn new<E: EthSpec>(state_root: &Hash256, state: &BeaconState<E>) -> Result<Self, Error> {
// Fill in the state root on the latest block header if necessary (this happens on all
// slots where there isn't a skip).
let latest_block_root = state.get_latest_block_root(*state_root);
let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch();
let epoch_boundary_state_root = if epoch_boundary_slot == state.slot {
*state_root
} else {
*state
.get_state_root(epoch_boundary_slot)
.map_err(HotColdDBError::HotStateSummaryError)?
};
Ok(HotStateSummary {
slot: state.slot,
latest_block_root,
epoch_boundary_state_root,
})
}
}
/// Struct for summarising a state in the freezer database.
#[derive(Debug, Clone, Copy, Default, Encode, Decode)]
struct ColdStateSummary {

View File

@ -2,7 +2,7 @@ use crate::*;
use ssz::{Decode, DecodeError, Encode};
use ssz_derive::{Decode, Encode};
use std::convert::TryInto;
use types::beacon_state::{CommitteeCache, CACHED_EPOCHS};
use types::beacon_state::{CloneConfig, CommitteeCache, CACHED_EPOCHS};
pub fn store_full_state<S: Store<E>, E: EthSpec>(
store: &S,
@ -58,7 +58,7 @@ impl<T: EthSpec> StorageContainer<T> {
/// Create a new instance for storing a `BeaconState`.
pub fn new(state: &BeaconState<T>) -> Self {
Self {
state: state.clone_without_caches(),
state: state.clone_with(CloneConfig::none()),
committee_caches: state.committee_caches.to_vec(),
}
}

View File

@ -343,7 +343,7 @@ mod test {
let state_a_root = hashes.next().unwrap();
state_b.state_roots[0] = state_a_root;
store.put_state(&state_a_root, &state_a).unwrap();
store.put_state(&state_a_root, state_a).unwrap();
let iter = BlockRootsIterator::new(store, &state_b);
@ -391,8 +391,8 @@ mod test {
let state_a_root = Hash256::from_low_u64_be(slots_per_historical_root as u64);
let state_b_root = Hash256::from_low_u64_be(slots_per_historical_root as u64 * 2);
store.put_state(&state_a_root, &state_a).unwrap();
store.put_state(&state_b_root, &state_b).unwrap();
store.put_state(&state_a_root, state_a).unwrap();
store.put_state(&state_b_root, state_b.clone()).unwrap();
let iter = StateRootsIterator::new(store, &state_b);

View File

@ -123,8 +123,8 @@ impl<E: EthSpec> Store<E> for LevelDB<E> {
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
store_full_state(self, state_root, state)
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error> {
store_full_state(self, state_root, &state)
}
/// Fetch a state from the store.

View File

@ -22,6 +22,7 @@ mod leveldb_store;
mod memory_store;
mod metrics;
mod partial_beacon_state;
mod state_batch;
pub mod iter;
pub mod migrate;
@ -29,7 +30,7 @@ pub mod migrate;
use std::sync::Arc;
pub use self::config::StoreConfig;
pub use self::hot_cold_store::HotColdDB as DiskStore;
pub use self::hot_cold_store::{HotColdDB as DiskStore, HotStateSummary};
pub use self::leveldb_store::LevelDB as SimpleDiskStore;
pub use self::memory_store::MemoryStore;
pub use self::migrate::Migrate;
@ -37,6 +38,8 @@ pub use self::partial_beacon_state::PartialBeaconState;
pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metrics::scrape_for_metrics;
pub use state_batch::StateBatch;
pub use types::beacon_state::CloneConfig;
pub use types::*;
/// An object capable of storing and retrieving objects implementing `StoreItem`.
@ -79,8 +82,29 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
I::db_delete(self, key)
}
/// Store a block in the store.
fn put_block(&self, block_root: &Hash256, block: BeaconBlock<E>) -> Result<(), Error> {
self.put(block_root, &block)
}
/// Fetch a block from the store.
fn get_block(&self, block_root: &Hash256) -> Result<Option<BeaconBlock<E>>, Error> {
self.get(block_root)
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error>;
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error>;
/// Store a state summary in the store.
// NOTE: this is a hack for the HotColdDb, we could consider splitting this
// trait and removing the generic `S: Store` types everywhere?
fn put_state_summary(
&self,
state_root: &Hash256,
summary: HotStateSummary,
) -> Result<(), Error> {
summary.db_put(self, state_root).map_err(Into::into)
}
/// Fetch a state from the store.
fn get_state(
@ -89,6 +113,17 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
slot: Option<Slot>,
) -> Result<Option<BeaconState<E>>, Error>;
/// Fetch a state from the store, controlling which cache fields are cloned.
fn get_state_with(
&self,
state_root: &Hash256,
slot: Option<Slot>,
_clone_config: CloneConfig,
) -> Result<Option<BeaconState<E>>, Error> {
// Default impl ignores config. Overriden in `HotColdDb`.
self.get_state(state_root, slot)
}
/// Given the root of an existing block in the store (`start_block_root`), return a parent
/// block with the specified `slot`.
///
@ -315,13 +350,12 @@ mod tests {
let hot_dir = tempdir().unwrap();
let cold_dir = tempdir().unwrap();
let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64;
let spec = MinimalEthSpec::default_spec();
let log = NullLoggerBuilder.build().unwrap();
let store = DiskStore::open(
&hot_dir.path(),
&cold_dir.path(),
slots_per_restore_point,
StoreConfig::default(),
spec,
log,
)

View File

@ -76,8 +76,8 @@ impl<E: EthSpec> Store<E> for MemoryStore<E> {
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
store_full_state(self, state_root, state)
fn put_state(&self, state_root: &Hash256, state: BeaconState<E>) -> Result<(), Error> {
store_full_state(self, state_root, &state)
}
/// Fetch a state from the store.

View File

@ -46,6 +46,22 @@ lazy_static! {
/*
* Beacon State
*/
pub static ref BEACON_STATE_GET_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_get_total",
"Total number of beacon states requested from the store (cache or DB)"
);
pub static ref BEACON_STATE_HOT_GET_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_hot_get_total",
"Total number of hot beacon states requested from the store (cache or DB)"
);
pub static ref BEACON_STATE_CACHE_HIT_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_cache_hit_total",
"Number of hits to the store's state cache"
);
pub static ref BEACON_STATE_CACHE_CLONE_TIME: Result<Histogram> = try_create_histogram(
"store_beacon_state_cache_clone_time",
"Time to load a beacon block from the block cache"
);
pub static ref BEACON_STATE_READ_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_state_read_seconds",
"Total time required to read a BeaconState from the database"
@ -81,6 +97,14 @@ lazy_static! {
/*
* Beacon Block
*/
pub static ref BEACON_BLOCK_GET_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_block_get_total",
"Total number of beacon blocks requested from the store (cache or DB)"
);
pub static ref BEACON_BLOCK_CACHE_HIT_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_block_cache_hit_total",
"Number of hits to the store's block cache"
);
pub static ref BEACON_BLOCK_READ_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_block_read_overhead_seconds",
"Overhead on reading a beacon block from the DB (e.g., decoding)"

View File

@ -0,0 +1,47 @@
use crate::{Error, HotStateSummary, Store};
use types::{BeaconState, EthSpec, Hash256};
/// A collection of states to be stored in the database.
///
/// Consumes minimal space in memory by not storing states between epoch boundaries.
#[derive(Debug, Clone, Default)]
pub struct StateBatch<E: EthSpec> {
items: Vec<BatchItem<E>>,
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
enum BatchItem<E: EthSpec> {
Full(Hash256, BeaconState<E>),
Summary(Hash256, HotStateSummary),
}
impl<E: EthSpec> StateBatch<E> {
/// Create a new empty batch.
pub fn new() -> Self {
Self::default()
}
/// Stage a `BeaconState` to be stored.
pub fn add_state(&mut self, state_root: Hash256, state: &BeaconState<E>) -> Result<(), Error> {
let item = if state.slot % E::slots_per_epoch() == 0 {
BatchItem::Full(state_root, state.clone())
} else {
BatchItem::Summary(state_root, HotStateSummary::new(&state_root, state)?)
};
self.items.push(item);
Ok(())
}
/// Write the batch to the database.
///
/// May fail to write the full batch if any of the items error (i.e. not atomic!)
pub fn commit<S: Store<E>>(self, store: &S) -> Result<(), Error> {
self.items.into_iter().try_for_each(|item| match item {
BatchItem::Full(state_root, state) => store.put_state(&state_root, state),
BatchItem::Summary(state_root, summary) => {
store.put_state_summary(&state_root, summary)
}
})
}
}

View File

@ -1,7 +1,6 @@
//! These examples only really exist so we can use them for flamegraph. If they get annoying to
//! maintain, feel free to delete.
use ssz::{Decode, Encode};
use types::{
test_utils::generate_deterministic_keypair, BeaconState, Eth1Data, EthSpec, Hash256,
MinimalEthSpec, Validator,

View File

@ -17,11 +17,13 @@ use tree_hash::TreeHash;
use tree_hash_derive::TreeHash;
pub use self::committee_cache::CommitteeCache;
pub use clone_config::CloneConfig;
pub use eth_spec::*;
pub use tree_hash_cache::BeaconTreeHashCache;
#[macro_use]
mod committee_cache;
mod clone_config;
mod exit_cache;
mod pubkey_cache;
mod tests;
@ -948,7 +950,8 @@ impl<T: EthSpec> BeaconState<T> {
})
}
pub fn clone_without_caches(&self) -> Self {
/// Clone the state whilst preserving only the selected caches.
pub fn clone_with(&self, config: CloneConfig) -> Self {
BeaconState {
genesis_time: self.genesis_time,
slot: self.slot,
@ -970,21 +973,35 @@ impl<T: EthSpec> BeaconState<T> {
previous_justified_checkpoint: self.previous_justified_checkpoint.clone(),
current_justified_checkpoint: self.current_justified_checkpoint.clone(),
finalized_checkpoint: self.finalized_checkpoint.clone(),
committee_caches: [
committee_caches: if config.committee_caches {
self.committee_caches.clone()
} else {
[
CommitteeCache::default(),
CommitteeCache::default(),
CommitteeCache::default(),
],
pubkey_cache: PubkeyCache::default(),
exit_cache: ExitCache::default(),
tree_hash_cache: None,
]
},
pubkey_cache: if config.pubkey_cache {
self.pubkey_cache.clone()
} else {
PubkeyCache::default()
},
exit_cache: if config.exit_cache {
self.exit_cache.clone()
} else {
ExitCache::default()
},
tree_hash_cache: if config.tree_hash_cache {
self.tree_hash_cache.clone()
} else {
None
},
}
}
pub fn clone_with_only_committee_caches(&self) -> Self {
let mut state = self.clone_without_caches();
state.committee_caches = self.committee_caches.clone();
state
self.clone_with(CloneConfig::committee_caches_only())
}
}

View File

@ -0,0 +1,43 @@
/// Configuration struct for controlling which caches of a `BeaconState` should be cloned.
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct CloneConfig {
pub committee_caches: bool,
pub pubkey_cache: bool,
pub exit_cache: bool,
pub tree_hash_cache: bool,
}
impl CloneConfig {
pub fn all() -> Self {
Self {
committee_caches: true,
pubkey_cache: true,
exit_cache: true,
tree_hash_cache: true,
}
}
pub fn none() -> Self {
Self::default()
}
pub fn committee_caches_only() -> Self {
Self {
committee_caches: true,
..Self::none()
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn sanity() {
assert!(CloneConfig::all().pubkey_cache);
assert!(!CloneConfig::none().tree_hash_cache);
assert!(CloneConfig::committee_caches_only().committee_caches);
assert!(!CloneConfig::committee_caches_only().exit_cache);
}
}

View File

@ -125,6 +125,75 @@ fn cache_initialization() {
test_cache_initialization(&mut state, RelativeEpoch::Next, &spec);
}
fn test_clone_config<E: EthSpec>(base_state: &BeaconState<E>, clone_config: CloneConfig) {
let state = base_state.clone_with(clone_config.clone());
if clone_config.committee_caches {
state
.committee_cache(RelativeEpoch::Previous)
.expect("committee cache exists");
state
.committee_cache(RelativeEpoch::Current)
.expect("committee cache exists");
state
.committee_cache(RelativeEpoch::Next)
.expect("committee cache exists");
} else {
state
.committee_cache(RelativeEpoch::Previous)
.expect_err("shouldn't exist");
state
.committee_cache(RelativeEpoch::Current)
.expect_err("shouldn't exist");
state
.committee_cache(RelativeEpoch::Next)
.expect_err("shouldn't exist");
}
if clone_config.pubkey_cache {
assert_ne!(state.pubkey_cache.len(), 0);
} else {
assert_eq!(state.pubkey_cache.len(), 0);
}
if clone_config.exit_cache {
state
.exit_cache
.check_initialized()
.expect("exit cache exists");
} else {
state
.exit_cache
.check_initialized()
.expect_err("exit cache doesn't exist");
}
if clone_config.tree_hash_cache {
assert!(state.tree_hash_cache.is_some());
} else {
assert!(state.tree_hash_cache.is_none(), "{:?}", clone_config);
}
}
#[test]
fn clone_config() {
let spec = MinimalEthSpec::default_spec();
let builder: TestingBeaconStateBuilder<MinimalEthSpec> =
TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(16, &spec);
let (mut state, _keypairs) = builder.build();
state.build_all_caches(&spec).unwrap();
let num_caches = 4;
let all_configs = (0..2u8.pow(num_caches)).map(|i| CloneConfig {
committee_caches: (i & 1) != 0,
pubkey_cache: ((i >> 1) & 1) != 0,
exit_cache: ((i >> 2) & 1) != 0,
tree_hash_cache: ((i >> 3) & 1) != 0,
});
for config in all_configs {
test_clone_config(&state, config);
}
}
#[test]
fn tree_hash_cache() {
use crate::test_utils::{SeedableRng, TestRandom, XorShiftRng};