Improve freezer DB efficiency with periodic restore points (#649)
* Draft of checkpoint freezer DB * Fix bugs * Adjust root iterators for checkpoint database * Fix freezer state lookups with no slot hint * Fix split comment * Use "restore point" to refer to frozen states * Resolve some FIXMEs * Configurable slots per restore point * Document new freezer DB functions * Fix up StoreConfig * Fix new test for merge * Document SPRP default CLI flag, clarify tests
This commit is contained in:
parent
5a765396b7
commit
d0319320ce
@ -27,9 +27,11 @@ fn get_store(db_path: &TempDir) -> Arc<DiskStore> {
|
|||||||
let spec = E::default_spec();
|
let spec = E::default_spec();
|
||||||
let hot_path = db_path.path().join("hot_db");
|
let hot_path = db_path.path().join("hot_db");
|
||||||
let cold_path = db_path.path().join("cold_db");
|
let cold_path = db_path.path().join("cold_db");
|
||||||
|
let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64;
|
||||||
let log = NullLoggerBuilder.build().expect("logger should build");
|
let log = NullLoggerBuilder.build().expect("logger should build");
|
||||||
Arc::new(
|
Arc::new(
|
||||||
DiskStore::open(&hot_path, &cold_path, spec, log).expect("disk store should initialize"),
|
DiskStore::open::<E>(&hot_path, &cold_path, slots_per_restore_point, spec, log)
|
||||||
|
.expect("disk store should initialize"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ use beacon_chain::test_utils::{
|
|||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use sloggers::{null::NullLoggerBuilder, Build};
|
use sloggers::{null::NullLoggerBuilder, Build};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use store::DiskStore;
|
use store::{DiskStore, Store};
|
||||||
use tempfile::{tempdir, TempDir};
|
use tempfile::{tempdir, TempDir};
|
||||||
use tree_hash::TreeHash;
|
use tree_hash::TreeHash;
|
||||||
use types::test_utils::{SeedableRng, XorShiftRng};
|
use types::test_utils::{SeedableRng, XorShiftRng};
|
||||||
@ -30,9 +30,11 @@ fn get_store(db_path: &TempDir) -> Arc<DiskStore> {
|
|||||||
let spec = MinimalEthSpec::default_spec();
|
let spec = MinimalEthSpec::default_spec();
|
||||||
let hot_path = db_path.path().join("hot_db");
|
let hot_path = db_path.path().join("hot_db");
|
||||||
let cold_path = db_path.path().join("cold_db");
|
let cold_path = db_path.path().join("cold_db");
|
||||||
|
let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64;
|
||||||
let log = NullLoggerBuilder.build().expect("logger should build");
|
let log = NullLoggerBuilder.build().expect("logger should build");
|
||||||
Arc::new(
|
Arc::new(
|
||||||
DiskStore::open(&hot_path, &cold_path, spec, log).expect("disk store should initialize"),
|
DiskStore::open::<E>(&hot_path, &cold_path, slots_per_restore_point, spec, log)
|
||||||
|
.expect("disk store should initialize"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,6 +64,7 @@ fn full_participation_no_skips() {
|
|||||||
check_finalization(&harness, num_blocks_produced);
|
check_finalization(&harness, num_blocks_produced);
|
||||||
check_split_slot(&harness, store);
|
check_split_slot(&harness, store);
|
||||||
check_chain_dump(&harness, num_blocks_produced + 1);
|
check_chain_dump(&harness, num_blocks_produced + 1);
|
||||||
|
check_iterators(&harness);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -99,6 +102,7 @@ fn randomised_skips() {
|
|||||||
|
|
||||||
check_split_slot(&harness, store);
|
check_split_slot(&harness, store);
|
||||||
check_chain_dump(&harness, num_blocks_produced + 1);
|
check_chain_dump(&harness, num_blocks_produced + 1);
|
||||||
|
check_iterators(&harness);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -140,6 +144,7 @@ fn long_skip() {
|
|||||||
check_finalization(&harness, initial_blocks + skip_slots + final_blocks);
|
check_finalization(&harness, initial_blocks + skip_slots + final_blocks);
|
||||||
check_split_slot(&harness, store);
|
check_split_slot(&harness, store);
|
||||||
check_chain_dump(&harness, initial_blocks + final_blocks + 1);
|
check_chain_dump(&harness, initial_blocks + final_blocks + 1);
|
||||||
|
check_iterators(&harness);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Go forward to the point where the genesis randao value is no longer part of the vector.
|
/// Go forward to the point where the genesis randao value is no longer part of the vector.
|
||||||
@ -201,6 +206,7 @@ fn randao_genesis_storage() {
|
|||||||
check_finalization(&harness, num_slots);
|
check_finalization(&harness, num_slots);
|
||||||
check_split_slot(&harness, store);
|
check_split_slot(&harness, store);
|
||||||
check_chain_dump(&harness, num_slots + 1);
|
check_chain_dump(&harness, num_slots + 1);
|
||||||
|
check_iterators(&harness);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that closing and reopening a freezer DB restores the split slot to its correct value.
|
// Check that closing and reopening a freezer DB restores the split slot to its correct value.
|
||||||
@ -281,10 +287,44 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
|
|||||||
assert_eq!(chain_dump.len() as u64, expected_len);
|
assert_eq!(chain_dump.len() as u64, expected_len);
|
||||||
|
|
||||||
for checkpoint in chain_dump {
|
for checkpoint in chain_dump {
|
||||||
|
// Check that the tree hash of the stored state is as expected
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
checkpoint.beacon_state_root,
|
checkpoint.beacon_state_root,
|
||||||
Hash256::from_slice(&checkpoint.beacon_state.tree_hash_root()),
|
Hash256::from_slice(&checkpoint.beacon_state.tree_hash_root()),
|
||||||
"tree hash of stored state is incorrect"
|
"tree hash of stored state is incorrect"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Check that looking up the state root with no slot hint succeeds.
|
||||||
|
// This tests the state root -> slot mapping.
|
||||||
|
assert_eq!(
|
||||||
|
harness
|
||||||
|
.chain
|
||||||
|
.store
|
||||||
|
.get_state::<E>(&checkpoint.beacon_state_root, None)
|
||||||
|
.expect("no error")
|
||||||
|
.expect("state exists")
|
||||||
|
.slot,
|
||||||
|
checkpoint.beacon_state.slot
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check that state and block root iterators can reach genesis
|
||||||
|
fn check_iterators(harness: &TestHarness) {
|
||||||
|
assert_eq!(
|
||||||
|
harness
|
||||||
|
.chain
|
||||||
|
.rev_iter_state_roots()
|
||||||
|
.last()
|
||||||
|
.map(|(_, slot)| slot),
|
||||||
|
Some(Slot::new(0))
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
harness
|
||||||
|
.chain
|
||||||
|
.rev_iter_block_roots()
|
||||||
|
.last()
|
||||||
|
.map(|(_, slot)| slot),
|
||||||
|
Some(Slot::new(0))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
@ -577,7 +577,12 @@ where
|
|||||||
TEventHandler: EventHandler<TEthSpec> + 'static,
|
TEventHandler: EventHandler<TEthSpec> + 'static,
|
||||||
{
|
{
|
||||||
/// Specifies that the `Client` should use a `DiskStore` database.
|
/// Specifies that the `Client` should use a `DiskStore` database.
|
||||||
pub fn disk_store(mut self, hot_path: &Path, cold_path: &Path) -> Result<Self, String> {
|
pub fn disk_store(
|
||||||
|
mut self,
|
||||||
|
hot_path: &Path,
|
||||||
|
cold_path: &Path,
|
||||||
|
slots_per_restore_point: u64,
|
||||||
|
) -> Result<Self, String> {
|
||||||
let context = self
|
let context = self
|
||||||
.runtime_context
|
.runtime_context
|
||||||
.as_ref()
|
.as_ref()
|
||||||
@ -588,8 +593,14 @@ where
|
|||||||
.clone()
|
.clone()
|
||||||
.ok_or_else(|| "disk_store requires a chain spec".to_string())?;
|
.ok_or_else(|| "disk_store requires a chain spec".to_string())?;
|
||||||
|
|
||||||
let store = DiskStore::open(hot_path, cold_path, spec, context.log)
|
let store = DiskStore::open::<TEthSpec>(
|
||||||
.map_err(|e| format!("Unable to open database: {:?}", e).to_string())?;
|
hot_path,
|
||||||
|
cold_path,
|
||||||
|
slots_per_restore_point,
|
||||||
|
spec,
|
||||||
|
context.log,
|
||||||
|
)
|
||||||
|
.map_err(|e| format!("Unable to open database: {:?}", e).to_string())?;
|
||||||
self.store = Some(Arc::new(store));
|
self.store = Some(Arc::new(store));
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
@ -6,9 +6,6 @@ use std::path::PathBuf;
|
|||||||
/// The number initial validators when starting the `Minimal`.
|
/// The number initial validators when starting the `Minimal`.
|
||||||
const TESTNET_SPEC_CONSTANTS: &str = "minimal";
|
const TESTNET_SPEC_CONSTANTS: &str = "minimal";
|
||||||
|
|
||||||
/// Default directory name for the freezer database under the top-level data dir.
|
|
||||||
const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db";
|
|
||||||
|
|
||||||
/// Defines how the client should initialize the `BeaconChain` and other components.
|
/// Defines how the client should initialize the `BeaconChain` and other components.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub enum ClientGenesis {
|
pub enum ClientGenesis {
|
||||||
@ -45,9 +42,6 @@ impl Default for ClientGenesis {
|
|||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
pub testnet_dir: Option<PathBuf>,
|
pub testnet_dir: Option<PathBuf>,
|
||||||
pub db_type: String,
|
|
||||||
pub db_name: String,
|
|
||||||
pub freezer_db_path: Option<PathBuf>,
|
|
||||||
pub log_file: PathBuf,
|
pub log_file: PathBuf,
|
||||||
pub spec_constants: String,
|
pub spec_constants: String,
|
||||||
/// If true, the node will use co-ordinated junk for eth1 values.
|
/// If true, the node will use co-ordinated junk for eth1 values.
|
||||||
@ -59,6 +53,7 @@ pub struct Config {
|
|||||||
/// The `genesis` field is not serialized or deserialized by `serde` to ensure it is defined
|
/// The `genesis` field is not serialized or deserialized by `serde` to ensure it is defined
|
||||||
/// via the CLI at runtime, instead of from a configuration file saved to disk.
|
/// via the CLI at runtime, instead of from a configuration file saved to disk.
|
||||||
pub genesis: ClientGenesis,
|
pub genesis: ClientGenesis,
|
||||||
|
pub store: store::StoreConfig,
|
||||||
pub network: network::NetworkConfig,
|
pub network: network::NetworkConfig,
|
||||||
pub rest_api: rest_api::Config,
|
pub rest_api: rest_api::Config,
|
||||||
pub websocket_server: websocket_server::Config,
|
pub websocket_server: websocket_server::Config,
|
||||||
@ -71,10 +66,8 @@ impl Default for Config {
|
|||||||
data_dir: PathBuf::from(".lighthouse"),
|
data_dir: PathBuf::from(".lighthouse"),
|
||||||
testnet_dir: None,
|
testnet_dir: None,
|
||||||
log_file: PathBuf::from(""),
|
log_file: PathBuf::from(""),
|
||||||
db_type: "disk".to_string(),
|
|
||||||
db_name: "chain_db".to_string(),
|
|
||||||
freezer_db_path: None,
|
|
||||||
genesis: <_>::default(),
|
genesis: <_>::default(),
|
||||||
|
store: <_>::default(),
|
||||||
network: NetworkConfig::default(),
|
network: NetworkConfig::default(),
|
||||||
rest_api: <_>::default(),
|
rest_api: <_>::default(),
|
||||||
websocket_server: <_>::default(),
|
websocket_server: <_>::default(),
|
||||||
@ -90,7 +83,7 @@ impl Config {
|
|||||||
/// Get the database path without initialising it.
|
/// Get the database path without initialising it.
|
||||||
pub fn get_db_path(&self) -> Option<PathBuf> {
|
pub fn get_db_path(&self) -> Option<PathBuf> {
|
||||||
self.get_data_dir()
|
self.get_data_dir()
|
||||||
.map(|data_dir| data_dir.join(&self.db_name))
|
.map(|data_dir| data_dir.join(&self.store.db_name))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the database path, creating it if necessary.
|
/// Get the database path, creating it if necessary.
|
||||||
@ -104,7 +97,7 @@ impl Config {
|
|||||||
/// Fetch default path to use for the freezer database.
|
/// Fetch default path to use for the freezer database.
|
||||||
fn default_freezer_db_path(&self) -> Option<PathBuf> {
|
fn default_freezer_db_path(&self) -> Option<PathBuf> {
|
||||||
self.get_data_dir()
|
self.get_data_dir()
|
||||||
.map(|data_dir| data_dir.join(DEFAULT_FREEZER_DB_DIR))
|
.map(|data_dir| data_dir.join(self.store.default_freezer_db_dir()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the path to which the client may initialize the on-disk freezer database.
|
/// Returns the path to which the client may initialize the on-disk freezer database.
|
||||||
@ -112,7 +105,8 @@ impl Config {
|
|||||||
/// Will attempt to use the user-supplied path from e.g. the CLI, or will default
|
/// Will attempt to use the user-supplied path from e.g. the CLI, or will default
|
||||||
/// to a directory in the data_dir if no path is provided.
|
/// to a directory in the data_dir if no path is provided.
|
||||||
pub fn get_freezer_db_path(&self) -> Option<PathBuf> {
|
pub fn get_freezer_db_path(&self) -> Option<PathBuf> {
|
||||||
self.freezer_db_path
|
self.store
|
||||||
|
.freezer_db_path
|
||||||
.clone()
|
.clone()
|
||||||
.or_else(|| self.default_freezer_db_path())
|
.or_else(|| self.default_freezer_db_path())
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use clap::{App, Arg, SubCommand};
|
use clap::{App, Arg, SubCommand};
|
||||||
|
use store::StoreConfig;
|
||||||
|
|
||||||
pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||||
App::new("beacon_node")
|
App::new("beacon_node")
|
||||||
@ -188,6 +189,20 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.default_value("http://localhost:8545")
|
.default_value("http://localhost:8545")
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("slots-per-restore-point")
|
||||||
|
.long("slots-per-restore-point")
|
||||||
|
.value_name("SLOT_COUNT")
|
||||||
|
.help("Specifies how often a freezer DB restore point should be stored. \
|
||||||
|
DO NOT CHANGE AFTER INITIALIZATION.")
|
||||||
|
.takes_value(true)
|
||||||
|
.default_value(
|
||||||
|
Box::leak(
|
||||||
|
format!("{}", StoreConfig::default().slots_per_restore_point)
|
||||||
|
.into_boxed_str()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
/*
|
/*
|
||||||
* The "testnet" sub-command.
|
* The "testnet" sub-command.
|
||||||
*
|
*
|
||||||
|
@ -233,7 +233,13 @@ pub fn get_configs<E: EthSpec>(
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Some(freezer_dir) = cli_args.value_of("freezer-dir") {
|
if let Some(freezer_dir) = cli_args.value_of("freezer-dir") {
|
||||||
client_config.freezer_db_path = Some(PathBuf::from(freezer_dir));
|
client_config.store.freezer_db_path = Some(PathBuf::from(freezer_dir));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(slots_per_restore_point) = cli_args.value_of("slots-per-restore-point") {
|
||||||
|
client_config.store.slots_per_restore_point = slots_per_restore_point
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| "slots-per-restore-point is not a valid integer".to_string())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if eth2_config.spec_constants != client_config.spec_constants {
|
if eth2_config.spec_constants != client_config.spec_constants {
|
||||||
|
@ -79,6 +79,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
|
|||||||
let spec = context.eth2_config().spec.clone();
|
let spec = context.eth2_config().spec.clone();
|
||||||
let genesis_eth1_config = client_config.eth1.clone();
|
let genesis_eth1_config = client_config.eth1.clone();
|
||||||
let client_genesis = client_config.genesis.clone();
|
let client_genesis = client_config.genesis.clone();
|
||||||
|
let store_config = client_config.store.clone();
|
||||||
let log = context.log.clone();
|
let log = context.log.clone();
|
||||||
|
|
||||||
let db_path_res = client_config.create_db_path();
|
let db_path_res = client_config.create_db_path();
|
||||||
@ -90,7 +91,11 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
|
|||||||
Ok(ClientBuilder::new(context.eth_spec_instance.clone())
|
Ok(ClientBuilder::new(context.eth_spec_instance.clone())
|
||||||
.runtime_context(context)
|
.runtime_context(context)
|
||||||
.chain_spec(spec)
|
.chain_spec(spec)
|
||||||
.disk_store(&db_path, &freezer_db_path_res?)?
|
.disk_store(
|
||||||
|
&db_path,
|
||||||
|
&freezer_db_path_res?,
|
||||||
|
store_config.slots_per_restore_point,
|
||||||
|
)?
|
||||||
.background_migrator()?)
|
.background_migrator()?)
|
||||||
})
|
})
|
||||||
.and_then(move |builder| {
|
.and_then(move |builder| {
|
||||||
|
@ -16,6 +16,9 @@ eth2_ssz = "0.1.2"
|
|||||||
eth2_ssz_derive = "0.1.0"
|
eth2_ssz_derive = "0.1.0"
|
||||||
tree_hash = "0.1.0"
|
tree_hash = "0.1.0"
|
||||||
types = { path = "../../eth2/types" }
|
types = { path = "../../eth2/types" }
|
||||||
|
state_processing = { path = "../../eth2/state_processing" }
|
||||||
slog = "2.2.3"
|
slog = "2.2.3"
|
||||||
|
serde = "1.0"
|
||||||
|
serde_derive = "1.0.102"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
|
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
|
||||||
|
32
beacon_node/store/src/config.rs
Normal file
32
beacon_node/store/src/config.rs
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
use serde_derive::{Deserialize, Serialize};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use types::{EthSpec, MinimalEthSpec};
|
||||||
|
|
||||||
|
/// Default directory name for the freezer database under the top-level data dir.
|
||||||
|
const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db";
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct StoreConfig {
|
||||||
|
/// Name of the directory inside the data directory where the main "hot" DB is located.
|
||||||
|
pub db_name: String,
|
||||||
|
/// Path where the freezer database will be located.
|
||||||
|
pub freezer_db_path: Option<PathBuf>,
|
||||||
|
/// Number of slots to wait between storing restore points in the freezer database.
|
||||||
|
pub slots_per_restore_point: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for StoreConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
db_name: "chain_db".to_string(),
|
||||||
|
freezer_db_path: None,
|
||||||
|
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoreConfig {
|
||||||
|
pub fn default_freezer_db_dir(&self) -> &'static str {
|
||||||
|
DEFAULT_FREEZER_DB_DIR
|
||||||
|
}
|
||||||
|
}
|
@ -1,30 +1,43 @@
|
|||||||
use crate::chunked_vector::{
|
use crate::chunked_vector::{
|
||||||
store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots,
|
store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots,
|
||||||
};
|
};
|
||||||
use crate::iter::StateRootsIterator;
|
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
|
||||||
use crate::{
|
use crate::{
|
||||||
leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem,
|
leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem,
|
||||||
};
|
};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use slog::{info, trace, Logger};
|
use slog::{debug, trace, warn, Logger};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
|
use ssz_derive::{Decode, Encode};
|
||||||
|
use state_processing::{
|
||||||
|
per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy,
|
||||||
|
SlotProcessingError,
|
||||||
|
};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use types::*;
|
use types::*;
|
||||||
|
|
||||||
/// 32-byte key for accessing the `split_slot` of the freezer DB.
|
/// 32-byte key for accessing the `split` of the freezer DB.
|
||||||
pub const SPLIT_SLOT_DB_KEY: &str = "FREEZERDBSPLITSLOTFREEZERDBSPLIT";
|
pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE";
|
||||||
|
|
||||||
|
/// On-disk database that stores finalized states efficiently.
|
||||||
|
///
|
||||||
|
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
|
||||||
|
/// intermittent "restore point" states pre-finalization.
|
||||||
pub struct HotColdDB {
|
pub struct HotColdDB {
|
||||||
/// The slot before which all data is stored in the cold database.
|
/// The slot and state root at the point where the database is split between hot and cold.
|
||||||
///
|
///
|
||||||
/// Data for slots less than `split_slot` is in the cold DB, while data for slots
|
/// States with slots less than `split.slot` are in the cold DB, while states with slots
|
||||||
/// greater than or equal is in the hot DB.
|
/// greater than or equal are in the hot DB.
|
||||||
split_slot: RwLock<Slot>,
|
split: RwLock<Split>,
|
||||||
|
/// Number of slots per restore point state in the freezer database.
|
||||||
|
slots_per_restore_point: u64,
|
||||||
/// Cold database containing compact historical data.
|
/// Cold database containing compact historical data.
|
||||||
cold_db: LevelDB,
|
cold_db: LevelDB,
|
||||||
/// Hot database containing duplicated but quick-to-access recent data.
|
/// Hot database containing duplicated but quick-to-access recent data.
|
||||||
|
///
|
||||||
|
/// The hot database also contains all blocks.
|
||||||
hot_db: LevelDB,
|
hot_db: LevelDB,
|
||||||
/// Chain spec.
|
/// Chain spec.
|
||||||
spec: ChainSpec,
|
spec: ChainSpec,
|
||||||
@ -38,6 +51,24 @@ pub enum HotColdDbError {
|
|||||||
current_split_slot: Slot,
|
current_split_slot: Slot,
|
||||||
proposed_split_slot: Slot,
|
proposed_split_slot: Slot,
|
||||||
},
|
},
|
||||||
|
MissingStateToFreeze(Hash256),
|
||||||
|
MissingRestorePointHash(u64),
|
||||||
|
MissingRestorePoint(Hash256),
|
||||||
|
MissingStateSlot(Hash256),
|
||||||
|
MissingSplitState(Hash256, Slot),
|
||||||
|
RestorePointDecodeError(ssz::DecodeError),
|
||||||
|
RestorePointReplayFailure {
|
||||||
|
expected_state_root: Hash256,
|
||||||
|
observed_state_root: Hash256,
|
||||||
|
},
|
||||||
|
BlockReplayBeaconError(BeaconStateError),
|
||||||
|
BlockReplaySlotError(SlotProcessingError),
|
||||||
|
BlockReplayBlockError(BlockProcessingError),
|
||||||
|
InvalidSlotsPerRestorePoint {
|
||||||
|
slots_per_restore_point: u64,
|
||||||
|
slots_per_historical_root: u64,
|
||||||
|
},
|
||||||
|
RestorePointBlockHashError(BeaconStateError),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Store for HotColdDB {
|
impl Store for HotColdDB {
|
||||||
@ -79,24 +110,31 @@ impl Store for HotColdDB {
|
|||||||
) -> Result<Option<BeaconState<E>>, Error> {
|
) -> Result<Option<BeaconState<E>>, Error> {
|
||||||
if let Some(slot) = slot {
|
if let Some(slot) = slot {
|
||||||
if slot < self.get_split_slot() {
|
if slot < self.get_split_slot() {
|
||||||
self.load_archive_state(state_root)
|
self.load_archive_state(state_root, slot).map(Some)
|
||||||
} else {
|
} else {
|
||||||
self.hot_db.get_state(state_root, None)
|
self.hot_db.get_state(state_root, None)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
match self.hot_db.get_state(state_root, None)? {
|
match self.hot_db.get_state(state_root, None)? {
|
||||||
Some(state) => Ok(Some(state)),
|
Some(state) => Ok(Some(state)),
|
||||||
None => self.load_archive_state(state_root),
|
None => {
|
||||||
|
// Look-up the state in the freezer DB. We don't know the slot, so we must
|
||||||
|
// look it up separately and then use it to reconstruct the state from a
|
||||||
|
// restore point.
|
||||||
|
let slot = self.load_state_slot(state_root)?;
|
||||||
|
self.load_archive_state(state_root, slot).map(Some)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||||
fn freeze_to_state<E: EthSpec>(
|
fn freeze_to_state<E: EthSpec>(
|
||||||
store: Arc<Self>,
|
store: Arc<Self>,
|
||||||
_frozen_head_root: Hash256,
|
frozen_head_root: Hash256,
|
||||||
frozen_head: &BeaconState<E>,
|
frozen_head: &BeaconState<E>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
info!(
|
debug!(
|
||||||
store.log,
|
store.log,
|
||||||
"Freezer migration started";
|
"Freezer migration started";
|
||||||
"slot" => frozen_head.slot
|
"slot" => frozen_head.slot
|
||||||
@ -119,25 +157,28 @@ impl Store for HotColdDB {
|
|||||||
for (state_root, slot) in
|
for (state_root, slot) in
|
||||||
state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot)
|
state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot)
|
||||||
{
|
{
|
||||||
trace!(store.log, "Freezing";
|
if slot % store.slots_per_restore_point == 0 {
|
||||||
"slot" => slot,
|
let state: BeaconState<E> = store
|
||||||
"state_root" => format!("{}", state_root));
|
.hot_db
|
||||||
|
.get_state(&state_root, None)?
|
||||||
|
.ok_or_else(|| HotColdDbError::MissingStateToFreeze(state_root))?;
|
||||||
|
|
||||||
let state: BeaconState<E> = match store.hot_db.get_state(&state_root, None)? {
|
store.store_archive_state(&state_root, &state)?;
|
||||||
Some(s) => s,
|
}
|
||||||
// If there's no state it could be a skip slot, which is fine, our job is just
|
|
||||||
// to move everything that was in the hot DB to the cold.
|
// Store a pointer from this state root to its slot, so we can later reconstruct states
|
||||||
None => continue,
|
// from their state root alone.
|
||||||
};
|
store.store_state_slot(&state_root, slot)?;
|
||||||
|
|
||||||
to_delete.push(state_root);
|
to_delete.push(state_root);
|
||||||
|
|
||||||
store.store_archive_state(&state_root, &state)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Update the split slot
|
// 2. Update the split slot
|
||||||
*store.split_slot.write() = frozen_head.slot;
|
*store.split.write() = Split {
|
||||||
store.store_split_slot()?;
|
slot: frozen_head.slot,
|
||||||
|
state_root: frozen_head_root,
|
||||||
|
};
|
||||||
|
store.store_split()?;
|
||||||
|
|
||||||
// 3. Delete from the hot DB
|
// 3. Delete from the hot DB
|
||||||
for state_root in to_delete {
|
for state_root in to_delete {
|
||||||
@ -146,7 +187,7 @@ impl Store for HotColdDB {
|
|||||||
.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?;
|
.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
store.log,
|
store.log,
|
||||||
"Freezer migration complete";
|
"Freezer migration complete";
|
||||||
"slot" => frozen_head.slot
|
"slot" => frozen_head.slot
|
||||||
@ -157,38 +198,60 @@ impl Store for HotColdDB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl HotColdDB {
|
impl HotColdDB {
|
||||||
pub fn open(
|
/// Open a new or existing database, with the given paths to the hot and cold DBs.
|
||||||
|
///
|
||||||
|
/// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`.
|
||||||
|
pub fn open<E: EthSpec>(
|
||||||
hot_path: &Path,
|
hot_path: &Path,
|
||||||
cold_path: &Path,
|
cold_path: &Path,
|
||||||
|
slots_per_restore_point: u64,
|
||||||
spec: ChainSpec,
|
spec: ChainSpec,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<Self, Error> {
|
||||||
|
Self::verify_slots_per_restore_point::<E>(slots_per_restore_point)?;
|
||||||
|
|
||||||
let db = HotColdDB {
|
let db = HotColdDB {
|
||||||
split_slot: RwLock::new(Slot::new(0)),
|
split: RwLock::new(Split::default()),
|
||||||
|
slots_per_restore_point,
|
||||||
cold_db: LevelDB::open(cold_path)?,
|
cold_db: LevelDB::open(cold_path)?,
|
||||||
hot_db: LevelDB::open(hot_path)?,
|
hot_db: LevelDB::open(hot_path)?,
|
||||||
spec,
|
spec,
|
||||||
log,
|
log,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Load the previous split slot from the database (if any). This ensures we can
|
// Load the previous split slot from the database (if any). This ensures we can
|
||||||
// stop and restart correctly.
|
// stop and restart correctly.
|
||||||
if let Some(split_slot) = db.load_split_slot()? {
|
if let Some(split) = db.load_split()? {
|
||||||
*db.split_slot.write() = split_slot;
|
*db.split.write() = split;
|
||||||
}
|
}
|
||||||
Ok(db)
|
Ok(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Store a pre-finalization state in the freezer database.
|
||||||
|
///
|
||||||
|
/// Will return an error if the state does not lie on a restore point boundary.
|
||||||
pub fn store_archive_state<E: EthSpec>(
|
pub fn store_archive_state<E: EthSpec>(
|
||||||
&self,
|
&self,
|
||||||
state_root: &Hash256,
|
state_root: &Hash256,
|
||||||
state: &BeaconState<E>,
|
state: &BeaconState<E>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
if state.slot % self.slots_per_restore_point != 0 {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Not storing non-restore point state in freezer";
|
||||||
|
"slot" => state.slot.as_u64(),
|
||||||
|
"state_root" => format!("{:?}", state_root)
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
self.log,
|
self.log,
|
||||||
"Freezing state";
|
"Creating restore point";
|
||||||
"slot" => state.slot.as_u64(),
|
"slot" => state.slot,
|
||||||
"state_root" => format!("{:?}", state_root)
|
"state_root" => format!("{:?}", state_root)
|
||||||
);
|
);
|
||||||
|
|
||||||
// 1. Convert to PartialBeaconState and store that in the DB.
|
// 1. Convert to PartialBeaconState and store that in the DB.
|
||||||
let partial_state = PartialBeaconState::from_state_forgetful(state);
|
let partial_state = PartialBeaconState::from_state_forgetful(state);
|
||||||
partial_state.db_put(&self.cold_db, state_root)?;
|
partial_state.db_put(&self.cold_db, state_root)?;
|
||||||
@ -200,17 +263,35 @@ impl HotColdDB {
|
|||||||
store_updated_vector(HistoricalRoots, db, state, &self.spec)?;
|
store_updated_vector(HistoricalRoots, db, state, &self.spec)?;
|
||||||
store_updated_vector(RandaoMixes, db, state, &self.spec)?;
|
store_updated_vector(RandaoMixes, db, state, &self.spec)?;
|
||||||
|
|
||||||
|
// 3. Store restore point.
|
||||||
|
let restore_point_index = state.slot.as_u64() / self.slots_per_restore_point;
|
||||||
|
self.store_restore_point_hash(restore_point_index, *state_root)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Load a pre-finalization state from the freezer database.
|
||||||
|
///
|
||||||
|
/// Will reconstruct the state if it lies between restore points.
|
||||||
pub fn load_archive_state<E: EthSpec>(
|
pub fn load_archive_state<E: EthSpec>(
|
||||||
&self,
|
&self,
|
||||||
state_root: &Hash256,
|
state_root: &Hash256,
|
||||||
) -> Result<Option<BeaconState<E>>, Error> {
|
slot: Slot,
|
||||||
let mut partial_state = match PartialBeaconState::db_get(&self.cold_db, state_root)? {
|
) -> Result<BeaconState<E>, Error> {
|
||||||
Some(s) => s,
|
if slot % self.slots_per_restore_point == 0 {
|
||||||
None => return Ok(None),
|
self.load_restore_point(state_root)
|
||||||
};
|
} else {
|
||||||
|
self.load_intermediate_state(state_root, slot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load a restore point state by its `state_root`.
|
||||||
|
fn load_restore_point<E: EthSpec>(
|
||||||
|
&self,
|
||||||
|
state_root: &Hash256,
|
||||||
|
) -> Result<BeaconState<E>, Error> {
|
||||||
|
let mut partial_state = PartialBeaconState::db_get(&self.cold_db, state_root)?
|
||||||
|
.ok_or_else(|| HotColdDbError::MissingRestorePoint(*state_root))?;
|
||||||
|
|
||||||
// Fill in the fields of the partial state.
|
// Fill in the fields of the partial state.
|
||||||
partial_state.load_block_roots(&self.cold_db, &self.spec)?;
|
partial_state.load_block_roots(&self.cold_db, &self.spec)?;
|
||||||
@ -218,43 +299,277 @@ impl HotColdDB {
|
|||||||
partial_state.load_historical_roots(&self.cold_db, &self.spec)?;
|
partial_state.load_historical_roots(&self.cold_db, &self.spec)?;
|
||||||
partial_state.load_randao_mixes(&self.cold_db, &self.spec)?;
|
partial_state.load_randao_mixes(&self.cold_db, &self.spec)?;
|
||||||
|
|
||||||
let state: BeaconState<E> = partial_state.try_into()?;
|
Ok(partial_state.try_into()?)
|
||||||
|
|
||||||
Ok(Some(state))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Load a restore point state by its `restore_point_index`.
|
||||||
|
fn load_restore_point_by_index<E: EthSpec>(
|
||||||
|
&self,
|
||||||
|
restore_point_index: u64,
|
||||||
|
) -> Result<BeaconState<E>, Error> {
|
||||||
|
let state_root = self.load_restore_point_hash(restore_point_index)?;
|
||||||
|
self.load_restore_point(&state_root)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load a state that lies between restore points.
|
||||||
|
fn load_intermediate_state<E: EthSpec>(
|
||||||
|
&self,
|
||||||
|
state_root: &Hash256,
|
||||||
|
slot: Slot,
|
||||||
|
) -> Result<BeaconState<E>, Error> {
|
||||||
|
// 1. Load the restore points either side of the intermediate state.
|
||||||
|
let low_restore_point_idx = slot.as_u64() / self.slots_per_restore_point;
|
||||||
|
let high_restore_point_idx = low_restore_point_idx + 1;
|
||||||
|
|
||||||
|
// Acquire the read lock, so that the split can't change while this is happening.
|
||||||
|
let split = self.split.read();
|
||||||
|
|
||||||
|
let low_restore_point = self.load_restore_point_by_index(low_restore_point_idx)?;
|
||||||
|
// If the slot of the high point lies outside the freezer, use the split state
|
||||||
|
// as the upper restore point.
|
||||||
|
let high_restore_point = if high_restore_point_idx * self.slots_per_restore_point
|
||||||
|
>= split.slot.as_u64()
|
||||||
|
{
|
||||||
|
self.get_state::<E>(&split.state_root, Some(split.slot))?
|
||||||
|
.ok_or_else(|| HotColdDbError::MissingSplitState(split.state_root, split.slot))?
|
||||||
|
} else {
|
||||||
|
self.load_restore_point_by_index(high_restore_point_idx)?
|
||||||
|
};
|
||||||
|
|
||||||
|
// 2. Load the blocks from the high restore point back to the low restore point.
|
||||||
|
let blocks = self.load_blocks_to_replay(
|
||||||
|
low_restore_point.slot,
|
||||||
|
slot,
|
||||||
|
self.get_high_restore_point_block_root(&high_restore_point, slot)?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// 3. Replay the blocks on top of the low restore point.
|
||||||
|
let mut state = self.replay_blocks(low_restore_point, blocks, slot)?;
|
||||||
|
|
||||||
|
// 4. Check that the state root is correct (should be quick with the cache already built).
|
||||||
|
// TODO: we could optimise out *all* the tree hashing when replaying blocks,
|
||||||
|
// in which case we could also drop this check.
|
||||||
|
let observed_state_root = state.update_tree_hash_cache()?;
|
||||||
|
|
||||||
|
if observed_state_root == *state_root {
|
||||||
|
Ok(state)
|
||||||
|
} else {
|
||||||
|
Err(HotColdDbError::RestorePointReplayFailure {
|
||||||
|
expected_state_root: *state_root,
|
||||||
|
observed_state_root,
|
||||||
|
}
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`.
|
||||||
|
///
|
||||||
|
/// Defaults to the block root for `slot`, which *should* be in range.
|
||||||
|
fn get_high_restore_point_block_root<E: EthSpec>(
|
||||||
|
&self,
|
||||||
|
high_restore_point: &BeaconState<E>,
|
||||||
|
slot: Slot,
|
||||||
|
) -> Result<Hash256, HotColdDbError> {
|
||||||
|
high_restore_point
|
||||||
|
.get_block_root(slot)
|
||||||
|
.or_else(|_| high_restore_point.get_oldest_block_root())
|
||||||
|
.map(|x| *x)
|
||||||
|
.map_err(HotColdDbError::RestorePointBlockHashError)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load the blocks between `start_slot` and `end_slot` by backtracking from `end_block_hash`.
|
||||||
|
///
|
||||||
|
/// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot
|
||||||
|
/// equal to `start_slot`, to reach a state with slot equal to `end_slot`.
|
||||||
|
fn load_blocks_to_replay<E: EthSpec>(
|
||||||
|
&self,
|
||||||
|
start_slot: Slot,
|
||||||
|
end_slot: Slot,
|
||||||
|
end_block_hash: Hash256,
|
||||||
|
) -> Result<Vec<BeaconBlock<E>>, Error> {
|
||||||
|
let mut blocks = ParentRootBlockIterator::new(self, end_block_hash)
|
||||||
|
// Include the block at the end slot (if any), it needs to be
|
||||||
|
// replayed in order to construct the canonical state at `end_slot`.
|
||||||
|
.filter(|block| block.slot <= end_slot)
|
||||||
|
// Exclude the block at the start slot (if any), because it has already
|
||||||
|
// been applied to the starting state.
|
||||||
|
.take_while(|block| block.slot > start_slot)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
blocks.reverse();
|
||||||
|
Ok(blocks)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Replay `blocks` on top of `state` until `target_slot` is reached.
|
||||||
|
///
|
||||||
|
/// Will skip slots as necessary.
|
||||||
|
fn replay_blocks<E: EthSpec>(
|
||||||
|
&self,
|
||||||
|
mut state: BeaconState<E>,
|
||||||
|
blocks: Vec<BeaconBlock<E>>,
|
||||||
|
target_slot: Slot,
|
||||||
|
) -> Result<BeaconState<E>, Error> {
|
||||||
|
state
|
||||||
|
.build_all_caches(&self.spec)
|
||||||
|
.map_err(HotColdDbError::BlockReplayBeaconError)?;
|
||||||
|
|
||||||
|
for block in blocks {
|
||||||
|
while state.slot < block.slot {
|
||||||
|
per_slot_processing(&mut state, &self.spec)
|
||||||
|
.map_err(HotColdDbError::BlockReplaySlotError)?;
|
||||||
|
}
|
||||||
|
per_block_processing(
|
||||||
|
&mut state,
|
||||||
|
&block,
|
||||||
|
None,
|
||||||
|
BlockSignatureStrategy::NoVerification,
|
||||||
|
&self.spec,
|
||||||
|
)
|
||||||
|
.map_err(HotColdDbError::BlockReplayBlockError)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
while state.slot < target_slot {
|
||||||
|
per_slot_processing(&mut state, &self.spec)
|
||||||
|
.map_err(HotColdDbError::BlockReplaySlotError)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch a copy of the current split slot from memory.
|
||||||
pub fn get_split_slot(&self) -> Slot {
|
pub fn get_split_slot(&self) -> Slot {
|
||||||
*self.split_slot.read()
|
self.split.read().slot
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_split_slot(&self) -> Result<Option<Slot>, Error> {
|
/// Load the split point from disk.
|
||||||
let key = Hash256::from_slice(SPLIT_SLOT_DB_KEY.as_bytes());
|
fn load_split(&self) -> Result<Option<Split>, Error> {
|
||||||
let split_slot: Option<SplitSlot> = self.hot_db.get(&key)?;
|
let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes());
|
||||||
Ok(split_slot.map(|s| Slot::new(s.0)))
|
let split: Option<Split> = self.hot_db.get(&key)?;
|
||||||
|
Ok(split)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn store_split_slot(&self) -> Result<(), Error> {
|
/// Store the split point on disk.
|
||||||
let key = Hash256::from_slice(SPLIT_SLOT_DB_KEY.as_bytes());
|
fn store_split(&self) -> Result<(), Error> {
|
||||||
self.hot_db
|
let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes());
|
||||||
.put(&key, &SplitSlot(self.get_split_slot().as_u64()))?;
|
self.hot_db.put(&key, &*self.split.read())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Load the state root of a restore point.
|
||||||
|
fn load_restore_point_hash(&self, restore_point_index: u64) -> Result<Hash256, Error> {
|
||||||
|
let key = Self::restore_point_key(restore_point_index);
|
||||||
|
RestorePointHash::db_get(&self.cold_db, &key)?
|
||||||
|
.map(|r| r.state_root)
|
||||||
|
.ok_or(HotColdDbError::MissingRestorePointHash(restore_point_index).into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store the state root of a restore point.
|
||||||
|
fn store_restore_point_hash(
|
||||||
|
&self,
|
||||||
|
restore_point_index: u64,
|
||||||
|
state_root: Hash256,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let key = Self::restore_point_key(restore_point_index);
|
||||||
|
RestorePointHash { state_root }
|
||||||
|
.db_put(&self.cold_db, &key)
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a `restore_point_index` into a database key.
|
||||||
|
fn restore_point_key(restore_point_index: u64) -> Hash256 {
|
||||||
|
Hash256::from_low_u64_be(restore_point_index)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load a frozen state's slot, given its root.
|
||||||
|
fn load_state_slot(&self, state_root: &Hash256) -> Result<Slot, Error> {
|
||||||
|
StateSlot::db_get(&self.cold_db, state_root)?
|
||||||
|
.map(|s| s.slot)
|
||||||
|
.ok_or_else(|| HotColdDbError::MissingStateSlot(*state_root).into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store the slot of a frozen state.
|
||||||
|
fn store_state_slot(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
|
||||||
|
StateSlot { slot }
|
||||||
|
.db_put(&self.cold_db, state_root)
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check that the restore point frequency is a divisor of the slots per historical root.
|
||||||
|
///
|
||||||
|
/// This ensures that we have at least one restore point within range of our state
|
||||||
|
/// root history when iterating backwards (and allows for more frequent restore points if
|
||||||
|
/// desired).
|
||||||
|
fn verify_slots_per_restore_point<E: EthSpec>(
|
||||||
|
slots_per_restore_point: u64,
|
||||||
|
) -> Result<(), HotColdDbError> {
|
||||||
|
let slots_per_historical_root = E::SlotsPerHistoricalRoot::to_u64();
|
||||||
|
if slots_per_restore_point > 0 && slots_per_historical_root % slots_per_restore_point == 0 {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(HotColdDbError::InvalidSlotsPerRestorePoint {
|
||||||
|
slots_per_restore_point,
|
||||||
|
slots_per_historical_root,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Struct for storing the split slot in the database.
|
/// Struct for storing the split slot and state root in the database.
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy, Default, Encode, Decode)]
|
||||||
struct SplitSlot(u64);
|
struct Split {
|
||||||
|
slot: Slot,
|
||||||
|
state_root: Hash256,
|
||||||
|
}
|
||||||
|
|
||||||
impl SimpleStoreItem for SplitSlot {
|
impl SimpleStoreItem for Split {
|
||||||
fn db_column() -> DBColumn {
|
fn db_column() -> DBColumn {
|
||||||
DBColumn::BeaconMeta
|
DBColumn::BeaconMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
fn as_store_bytes(&self) -> Vec<u8> {
|
fn as_store_bytes(&self) -> Vec<u8> {
|
||||||
self.0.as_ssz_bytes()
|
self.as_ssz_bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
||||||
Ok(SplitSlot(u64::from_ssz_bytes(bytes)?))
|
Ok(Self::from_ssz_bytes(bytes)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Struct for storing the slot of a state root in the database.
|
||||||
|
#[derive(Clone, Copy, Default, Encode, Decode)]
|
||||||
|
struct StateSlot {
|
||||||
|
slot: Slot,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SimpleStoreItem for StateSlot {
|
||||||
|
fn db_column() -> DBColumn {
|
||||||
|
DBColumn::BeaconStateSlot
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_store_bytes(&self) -> Vec<u8> {
|
||||||
|
self.as_ssz_bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
||||||
|
Ok(Self::from_ssz_bytes(bytes)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Struct for storing the state root of a restore point in the database.
|
||||||
|
#[derive(Clone, Copy, Default, Encode, Decode)]
|
||||||
|
struct RestorePointHash {
|
||||||
|
state_root: Hash256,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SimpleStoreItem for RestorePointHash {
|
||||||
|
fn db_column() -> DBColumn {
|
||||||
|
DBColumn::BeaconRestorePoint
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_store_bytes(&self) -> Vec<u8> {
|
||||||
|
self.as_ssz_bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
||||||
|
Ok(Self::from_ssz_bytes(bytes)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,10 @@
|
|||||||
use crate::Store;
|
use crate::Store;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use types::{BeaconBlock, BeaconState, BeaconStateError, EthSpec, Hash256, Slot};
|
use types::{
|
||||||
|
typenum::Unsigned, BeaconBlock, BeaconState, BeaconStateError, EthSpec, Hash256, Slot,
|
||||||
|
};
|
||||||
|
|
||||||
/// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over.
|
/// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over.
|
||||||
///
|
///
|
||||||
@ -80,12 +83,9 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> {
|
|||||||
match self.beacon_state.get_state_root(self.slot) {
|
match self.beacon_state.get_state_root(self.slot) {
|
||||||
Ok(root) => Some((*root, self.slot)),
|
Ok(root) => Some((*root, self.slot)),
|
||||||
Err(BeaconStateError::SlotOutOfBounds) => {
|
Err(BeaconStateError::SlotOutOfBounds) => {
|
||||||
// Read a `BeaconState` from the store that has access to prior historical root.
|
// Read a `BeaconState` from the store that has access to prior historical roots.
|
||||||
let beacon_state: BeaconState<T> = {
|
let beacon_state =
|
||||||
let new_state_root = self.beacon_state.get_oldest_state_root().ok()?;
|
next_historical_root_backtrack_state(&*self.store, &self.beacon_state)?;
|
||||||
|
|
||||||
self.store.get_state(&new_state_root, None).ok()?
|
|
||||||
}?;
|
|
||||||
|
|
||||||
self.beacon_state = Cow::Owned(beacon_state);
|
self.beacon_state = Cow::Owned(beacon_state);
|
||||||
|
|
||||||
@ -98,6 +98,39 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Block iterator that uses the `parent_root` of each block to backtrack.
|
||||||
|
pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store> {
|
||||||
|
store: &'a S,
|
||||||
|
next_block_root: Hash256,
|
||||||
|
_phantom: PhantomData<E>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> {
|
||||||
|
pub fn new(store: &'a S, start_block_root: Hash256) -> Self {
|
||||||
|
Self {
|
||||||
|
store,
|
||||||
|
next_block_root: start_block_root,
|
||||||
|
_phantom: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> {
|
||||||
|
type Item = BeaconBlock<E>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
// Stop once we reach the zero parent, otherwise we'll keep returning the genesis
|
||||||
|
// block forever.
|
||||||
|
if self.next_block_root.is_zero() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
let block: BeaconBlock<E> = self.store.get(&self.next_block_root).ok()??;
|
||||||
|
self.next_block_root = block.parent_root;
|
||||||
|
Some(block)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
/// Extends `BlockRootsIterator`, returning `BeaconBlock` instances, instead of their roots.
|
/// Extends `BlockRootsIterator`, returning `BeaconBlock` instances, instead of their roots.
|
||||||
pub struct BlockIterator<'a, T: EthSpec, U> {
|
pub struct BlockIterator<'a, T: EthSpec, U> {
|
||||||
@ -177,7 +210,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> {
|
|||||||
type Item = (Hash256, Slot);
|
type Item = (Hash256, Slot);
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
if (self.slot == 0) || (self.slot > self.beacon_state.slot) {
|
if self.slot == 0 || self.slot > self.beacon_state.slot {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -186,13 +219,9 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> {
|
|||||||
match self.beacon_state.get_block_root(self.slot) {
|
match self.beacon_state.get_block_root(self.slot) {
|
||||||
Ok(root) => Some((*root, self.slot)),
|
Ok(root) => Some((*root, self.slot)),
|
||||||
Err(BeaconStateError::SlotOutOfBounds) => {
|
Err(BeaconStateError::SlotOutOfBounds) => {
|
||||||
// Read a `BeaconState` from the store that has access to prior historical root.
|
// Read a `BeaconState` from the store that has access to prior historical roots.
|
||||||
let beacon_state: BeaconState<T> = {
|
let beacon_state =
|
||||||
// Load the earliest state from disk.
|
next_historical_root_backtrack_state(&*self.store, &self.beacon_state)?;
|
||||||
let new_state_root = self.beacon_state.get_oldest_state_root().ok()?;
|
|
||||||
|
|
||||||
self.store.get_state(&new_state_root, None).ok()?
|
|
||||||
}?;
|
|
||||||
|
|
||||||
self.beacon_state = Cow::Owned(beacon_state);
|
self.beacon_state = Cow::Owned(beacon_state);
|
||||||
|
|
||||||
@ -205,6 +234,26 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetch the next state to use whilst backtracking in `*RootsIterator`.
|
||||||
|
fn next_historical_root_backtrack_state<E: EthSpec, S: Store>(
|
||||||
|
store: &S,
|
||||||
|
current_state: &BeaconState<E>,
|
||||||
|
) -> Option<BeaconState<E>> {
|
||||||
|
// For compatibility with the freezer database's restore points, we load a state at
|
||||||
|
// a restore point slot (thus avoiding replaying blocks). In the case where we're
|
||||||
|
// not frozen, this just means we might not jump back by the maximum amount on
|
||||||
|
// our first jump (i.e. at most 1 extra state load).
|
||||||
|
let new_state_slot = slot_of_prev_restore_point::<E>(current_state.slot);
|
||||||
|
let new_state_root = current_state.get_state_root(new_state_slot).ok()?;
|
||||||
|
store.get_state(new_state_root, Some(new_state_slot)).ok()?
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute the slot of the last guaranteed restore point in the freezer database.
|
||||||
|
fn slot_of_prev_restore_point<E: EthSpec>(current_slot: Slot) -> Slot {
|
||||||
|
let slots_per_historical_root = E::SlotsPerHistoricalRoot::to_u64();
|
||||||
|
(current_slot - 1) / slots_per_historical_root * slots_per_historical_root
|
||||||
|
}
|
||||||
|
|
||||||
pub type ReverseBlockRootIterator<'a, E, S> =
|
pub type ReverseBlockRootIterator<'a, E, S> =
|
||||||
ReverseHashAndSlotIterator<BlockRootsIterator<'a, E, S>>;
|
ReverseHashAndSlotIterator<BlockRootsIterator<'a, E, S>>;
|
||||||
pub type ReverseStateRootIterator<'a, E, S> =
|
pub type ReverseStateRootIterator<'a, E, S> =
|
||||||
|
@ -12,6 +12,7 @@ extern crate lazy_static;
|
|||||||
|
|
||||||
mod block_at_slot;
|
mod block_at_slot;
|
||||||
pub mod chunked_vector;
|
pub mod chunked_vector;
|
||||||
|
pub mod config;
|
||||||
mod errors;
|
mod errors;
|
||||||
mod hot_cold_store;
|
mod hot_cold_store;
|
||||||
mod impls;
|
mod impls;
|
||||||
@ -25,6 +26,7 @@ pub mod migrate;
|
|||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
pub use self::config::StoreConfig;
|
||||||
pub use self::hot_cold_store::HotColdDB as DiskStore;
|
pub use self::hot_cold_store::HotColdDB as DiskStore;
|
||||||
pub use self::leveldb_store::LevelDB as SimpleDiskStore;
|
pub use self::leveldb_store::LevelDB as SimpleDiskStore;
|
||||||
pub use self::memory_store::MemoryStore;
|
pub use self::memory_store::MemoryStore;
|
||||||
@ -117,6 +119,10 @@ pub enum DBColumn {
|
|||||||
BeaconBlock,
|
BeaconBlock,
|
||||||
BeaconState,
|
BeaconState,
|
||||||
BeaconChain,
|
BeaconChain,
|
||||||
|
/// For the table mapping restore point numbers to state roots.
|
||||||
|
BeaconRestorePoint,
|
||||||
|
/// For the mapping from state roots to their slots.
|
||||||
|
BeaconStateSlot,
|
||||||
BeaconBlockRoots,
|
BeaconBlockRoots,
|
||||||
BeaconStateRoots,
|
BeaconStateRoots,
|
||||||
BeaconHistoricalRoots,
|
BeaconHistoricalRoots,
|
||||||
@ -131,6 +137,8 @@ impl Into<&'static str> for DBColumn {
|
|||||||
DBColumn::BeaconBlock => "blk",
|
DBColumn::BeaconBlock => "blk",
|
||||||
DBColumn::BeaconState => "ste",
|
DBColumn::BeaconState => "ste",
|
||||||
DBColumn::BeaconChain => "bch",
|
DBColumn::BeaconChain => "bch",
|
||||||
|
DBColumn::BeaconRestorePoint => "brp",
|
||||||
|
DBColumn::BeaconStateSlot => "bss",
|
||||||
DBColumn::BeaconBlockRoots => "bbr",
|
DBColumn::BeaconBlockRoots => "bbr",
|
||||||
DBColumn::BeaconStateRoots => "bsr",
|
DBColumn::BeaconStateRoots => "bsr",
|
||||||
DBColumn::BeaconHistoricalRoots => "bhr",
|
DBColumn::BeaconHistoricalRoots => "bhr",
|
||||||
@ -263,9 +271,17 @@ mod tests {
|
|||||||
|
|
||||||
let hot_dir = tempdir().unwrap();
|
let hot_dir = tempdir().unwrap();
|
||||||
let cold_dir = tempdir().unwrap();
|
let cold_dir = tempdir().unwrap();
|
||||||
|
let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64;
|
||||||
let spec = MinimalEthSpec::default_spec();
|
let spec = MinimalEthSpec::default_spec();
|
||||||
let log = NullLoggerBuilder.build().unwrap();
|
let log = NullLoggerBuilder.build().unwrap();
|
||||||
let store = DiskStore::open(&hot_dir.path(), &cold_dir.path(), spec, log).unwrap();
|
let store = DiskStore::open::<MinimalEthSpec>(
|
||||||
|
&hot_dir.path(),
|
||||||
|
&cold_dir.path(),
|
||||||
|
slots_per_restore_point,
|
||||||
|
spec,
|
||||||
|
log,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
test_impl(store);
|
test_impl(store);
|
||||||
}
|
}
|
||||||
|
@ -576,6 +576,14 @@ impl<T: EthSpec> BeaconState<T> {
|
|||||||
Ok(&self.state_roots[i])
|
Ok(&self.state_roots[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Gets the oldest (earliest slot) block root.
|
||||||
|
///
|
||||||
|
/// Spec v0.9.1
|
||||||
|
pub fn get_oldest_block_root(&self) -> Result<&Hash256, Error> {
|
||||||
|
let i = self.get_latest_block_roots_index(self.slot - self.block_roots.len() as u64)?;
|
||||||
|
Ok(&self.block_roots[i])
|
||||||
|
}
|
||||||
|
|
||||||
/// Sets the latest state root for slot.
|
/// Sets the latest state root for slot.
|
||||||
///
|
///
|
||||||
/// Spec v0.9.1
|
/// Spec v0.9.1
|
||||||
|
Loading…
Reference in New Issue
Block a user