Add database schema versioning (#1688)

## Issue Addressed

Closes #673

## Proposed Changes

Store a schema version in the database so that future releases can check they're running against a compatible database version. This would also enable automatic migration on breaking database changes, but that's left as future work.

The database config is also stored in the database so that the `slots_per_restore_point` value can be checked for consistency, which closes #673
This commit is contained in:
Michael Sproul 2020-09-30 02:36:07 +00:00 committed by Paul Hauner
parent cdec3cec18
commit 22aedda1be
No known key found for this signature in database
GPG Key ID: 5E2CFF9B75FA63DF
10 changed files with 153 additions and 42 deletions

View File

@ -66,10 +66,11 @@ pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// validator pubkey cache.
pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
pub const BEACON_CHAIN_DB_KEY: [u8; 32] = [0; 32];
pub const OP_POOL_DB_KEY: [u8; 32] = [0; 32];
pub const ETH1_CACHE_DB_KEY: [u8; 32] = [0; 32];
pub const FORK_CHOICE_DB_KEY: [u8; 32] = [0; 32];
// These keys are all zero because they get stored in different columns, see `DBColumn` type.
pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero();
pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero();
pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero();
/// The result of a chain segment processing.
pub enum ChainSegmentResult<T: EthSpec> {
@ -260,7 +261,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let fork_choice = self.fork_choice.read();
self.store.put_item(
&Hash256::from_slice(&FORK_CHOICE_DB_KEY),
&FORK_CHOICE_DB_KEY,
&PersistedForkChoice {
fork_choice: fork_choice.to_persisted(),
fork_choice_store: fork_choice.fc_store().to_persisted(),
@ -272,8 +273,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::stop_timer(fork_choice_timer);
let head_timer = metrics::start_timer(&metrics::PERSIST_HEAD);
self.store
.put_item(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY), &persisted_head)?;
self.store.put_item(&BEACON_CHAIN_DB_KEY, &persisted_head)?;
metrics::stop_timer(head_timer);
@ -290,7 +290,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let _timer = metrics::start_timer(&metrics::PERSIST_OP_POOL);
self.store.put_item(
&Hash256::from_slice(&OP_POOL_DB_KEY),
&OP_POOL_DB_KEY,
&PersistedOperationPool::from_operation_pool(&self.op_pool),
)?;
@ -302,10 +302,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let _timer = metrics::start_timer(&metrics::PERSIST_OP_POOL);
if let Some(eth1_chain) = self.eth1_chain.as_ref() {
self.store.put_item(
&Hash256::from_slice(&ETH1_CACHE_DB_KEY),
&eth1_chain.as_ssz_container(),
)?;
self.store
.put_item(&ETH1_CACHE_DB_KEY, &eth1_chain.as_ssz_container())?;
}
Ok(())

View File

@ -229,7 +229,7 @@ where
.ok_or_else(|| "get_persisted_eth1_backend requires a store.".to_string())?;
store
.get_item::<SszEth1>(&Hash256::from_slice(&ETH1_CACHE_DB_KEY))
.get_item::<SszEth1>(&ETH1_CACHE_DB_KEY)
.map_err(|e| format!("DB error whilst reading eth1 cache: {:?}", e))
}
@ -241,7 +241,7 @@ where
.ok_or_else(|| "store_contains_beacon_chain requires a store.".to_string())?;
Ok(store
.get_item::<PersistedBeaconChain>(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY))
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
.is_some())
}
@ -272,7 +272,7 @@ where
.ok_or_else(|| "resume_from_db requires a store.".to_string())?;
let chain = store
.get_item::<PersistedBeaconChain>(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY))
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
.ok_or_else(|| {
"No persisted beacon chain found in store. Try purging the beacon chain database."
@ -280,7 +280,7 @@ where
})?;
let persisted_fork_choice = store
.get_item::<PersistedForkChoice>(&Hash256::from_slice(&FORK_CHOICE_DB_KEY))
.get_item::<PersistedForkChoice>(&FORK_CHOICE_DB_KEY)
.map_err(|e| format!("DB error when reading persisted fork choice: {:?}", e))?
.ok_or_else(|| "No persisted fork choice present in database.".to_string())?;
@ -307,7 +307,7 @@ where
self.op_pool = Some(
store
.get_item::<PersistedOperationPool<TEthSpec>>(&Hash256::from_slice(&OP_POOL_DB_KEY))
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
.map_err(|e| format!("DB error whilst reading persisted op pool: {:?}", e))?
.map(PersistedOperationPool::into_operation_pool)
.unwrap_or_else(OperationPool::new),

View File

@ -357,11 +357,10 @@ fn roundtrip_operation_pool() {
.persist_op_pool()
.expect("should persist op pool");
let key = Hash256::from_slice(&OP_POOL_DB_KEY);
let restored_op_pool = harness
.chain
.store
.get_item::<PersistedOperationPool<MinimalEthSpec>>(&key)
.get_item::<PersistedOperationPool<MinimalEthSpec>>(&OP_POOL_DB_KEY)
.expect("should read db")
.expect("should find op pool")
.into_operation_pool();

View File

@ -3,15 +3,14 @@ use std::sync::Arc;
use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem};
use types::{EthSpec, Hash256};
/// 32-byte key for accessing the `DhtEnrs`.
pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE";
/// 32-byte key for accessing the `DhtEnrs`. All zero because `DhtEnrs` has its own column.
pub const DHT_DB_KEY: Hash256 = Hash256::zero();
pub fn load_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: Arc<HotColdDB<E, Hot, Cold>>,
) -> Vec<Enr> {
// Load DHT from store
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
match store.get_item(&key) {
match store.get_item(&DHT_DB_KEY) {
Ok(Some(p)) => {
let p: PersistedDht = p;
p.enrs
@ -25,9 +24,7 @@ pub fn persist_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: Arc<HotColdDB<E, Hot, Cold>>,
enrs: Vec<Enr>,
) -> Result<(), store::Error> {
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
store.put_item(&key, &PersistedDht { enrs })?;
Ok(())
store.put_item(&DHT_DB_KEY, &PersistedDht { enrs })
}
/// Wrapper around DHT for persistence to disk.
@ -61,7 +58,7 @@ mod tests {
use std::str::FromStr;
use store::config::StoreConfig;
use store::{HotColdDB, MemoryStore};
use types::{ChainSpec, Hash256, MinimalEthSpec};
use types::{ChainSpec, MinimalEthSpec};
#[test]
fn test_persisted_dht() {
let log = NullLoggerBuilder.build().unwrap();
@ -71,11 +68,10 @@ mod tests {
MemoryStore<MinimalEthSpec>,
> = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap();
let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()];
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
store
.put_item(&key, &PersistedDht { enrs: enrs.clone() })
.put_item(&DHT_DB_KEY, &PersistedDht { enrs: enrs.clone() })
.unwrap();
let dht: PersistedDht = store.get_item(&key).unwrap().unwrap();
let dht: PersistedDht = store.get_item(&DHT_DB_KEY).unwrap().unwrap();
assert_eq!(dht.enrs, enrs);
}
}

View File

@ -267,7 +267,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.long("slots-per-restore-point")
.value_name("SLOT_COUNT")
.help("Specifies how often a freezer DB restore point should be stored. \
DO NOT DECREASE AFTER INITIALIZATION. [default: 2048 (mainnet) or 64 (minimal)]")
Cannot be changed after initialization. \
[default: 2048 (mainnet) or 64 (minimal)]")
.takes_value(true)
)
.arg(

View File

@ -1,11 +1,14 @@
use crate::{DBColumn, Error, StoreItem};
use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{EthSpec, MinimalEthSpec};
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;
/// Database configuration parameters.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
pub struct StoreConfig {
/// Number of slots to wait between storing restore points in the freezer database.
pub slots_per_restore_point: u64,
@ -13,6 +16,11 @@ pub struct StoreConfig {
pub block_cache_size: usize,
}
#[derive(Debug, Clone)]
pub enum StoreConfigError {
MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 },
}
impl Default for StoreConfig {
fn default() -> Self {
Self {
@ -22,3 +30,29 @@ impl Default for StoreConfig {
}
}
}
impl StoreConfig {
pub fn check_compatibility(&self, on_disk_config: &Self) -> Result<(), StoreConfigError> {
if self.slots_per_restore_point != on_disk_config.slots_per_restore_point {
return Err(StoreConfigError::MismatchedSlotsPerRestorePoint {
config: self.slots_per_restore_point,
on_disk: on_disk_config.slots_per_restore_point,
});
}
Ok(())
}
}
impl StoreItem for StoreConfig {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

View File

@ -1,4 +1,5 @@
use crate::chunked_vector::ChunkError;
use crate::config::StoreConfigError;
use crate::hot_cold_store::HotColdDBError;
use ssz::DecodeError;
use types::{BeaconStateError, Hash256, Slot};
@ -17,6 +18,7 @@ pub enum Error {
BlockNotFound(Hash256),
NoContinuationData,
SplitPointModified(Slot, Slot),
ConfigError(StoreConfigError),
}
impl From<DecodeError> for Error {
@ -49,6 +51,12 @@ impl From<DBError> for Error {
}
}
impl From<StoreConfigError> for Error {
fn from(e: StoreConfigError) -> Error {
Error::ConfigError(e)
}
}
#[derive(Debug)]
pub struct DBError {
pub message: String,

View File

@ -7,6 +7,9 @@ use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::{
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
@ -27,9 +30,6 @@ use std::path::Path;
use std::sync::Arc;
use types::*;
/// 32-byte key for accessing the `split` of the freezer DB.
pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE";
/// Defines how blocks should be replayed on states.
#[derive(PartialEq)]
pub enum BlockReplay {
@ -46,6 +46,8 @@ pub enum BlockReplay {
/// intermittent "restore point" states pre-finalization.
#[derive(Debug)]
pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The schema version. Loaded from disk on initialization.
schema_version: SchemaVersion,
/// The slot and state root at the point where the database is split between hot and cold.
///
/// States with slots less than `split.slot` are in the cold DB, while states with slots
@ -70,6 +72,10 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
#[derive(Debug, PartialEq)]
pub enum HotColdDBError {
UnsupportedSchemaVersion {
software_version: SchemaVersion,
disk_version: SchemaVersion,
},
/// Recoverable error indicating that the database freeze point couldn't be updated
/// due to the finalized block not lying on an epoch boundary (should be infrequent).
FreezeSlotUnaligned(Slot),
@ -106,6 +112,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
schema_version: CURRENT_SCHEMA_VERSION,
split: RwLock::new(Split::default()),
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
@ -134,6 +141,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
schema_version: CURRENT_SCHEMA_VERSION,
split: RwLock::new(Split::default()),
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
@ -144,12 +152,33 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
_phantom: PhantomData,
};
// Ensure that the schema version of the on-disk database matches the software.
// In the future, this would be the spot to hook in auto-migration, etc.
if let Some(schema_version) = db.load_schema_version()? {
if schema_version != CURRENT_SCHEMA_VERSION {
return Err(HotColdDBError::UnsupportedSchemaVersion {
software_version: CURRENT_SCHEMA_VERSION,
disk_version: schema_version,
}
.into());
}
} else {
db.store_schema_version(CURRENT_SCHEMA_VERSION)?;
}
// Ensure that any on-disk config is compatible with the supplied config.
if let Some(disk_config) = db.load_config()? {
db.config.check_compatibility(&disk_config)?;
}
db.store_config()?;
// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly.
if let Some(split) = db.load_split()? {
info!(
db.log,
"Hot-Cold DB initialized";
"version" => db.schema_version.0,
"split_slot" => split.slot,
"split_state" => format!("{:?}", split.state_root)
);
@ -744,11 +773,29 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
* self.config.slots_per_restore_point
}
/// Load the database schema version from disk.
fn load_schema_version(&self) -> Result<Option<SchemaVersion>, Error> {
self.hot_db.get(&SCHEMA_VERSION_KEY)
}
/// Store the database schema version.
fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> {
self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version)
}
/// Load previously-stored config from disk.
fn load_config(&self) -> Result<Option<StoreConfig>, Error> {
self.hot_db.get(&CONFIG_KEY)
}
/// Write the config to disk.
fn store_config(&self) -> Result<(), Error> {
self.hot_db.put(&CONFIG_KEY, &self.config)
}
/// Load the split point from disk.
fn load_split(&self) -> Result<Option<Split>, Error> {
let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes());
let split: Option<Split> = self.hot_db.get(&key)?;
Ok(split)
self.hot_db.get(&SPLIT_KEY)
}
/// Load the state root of a restore point.
@ -927,9 +974,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
slot: frozen_head.slot,
state_root: frozen_head_root,
};
store
.hot_db
.put_sync(&Hash256::from_slice(SPLIT_DB_KEY.as_bytes()), &split)?;
store.hot_db.put_sync(&SPLIT_KEY, &split)?;
// Split point is now persisted in the hot database on disk. The in-memory split point
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update

View File

@ -19,6 +19,7 @@ pub mod hot_cold_store;
mod impls;
mod leveldb_store;
mod memory_store;
mod metadata;
mod metrics;
mod partial_beacon_state;
@ -153,7 +154,7 @@ pub enum DBColumn {
}
impl Into<&'static str> for DBColumn {
/// Returns a `&str` that can be used for keying a key-value data base.
/// Returns a `&str` prefix to be added to keys before they hit the key-value database.
fn into(self) -> &'static str {
match self {
DBColumn::BeaconMeta => "bma",

View File

@ -0,0 +1,29 @@
use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode};
use types::Hash256;
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(1);
// All the keys that get stored under the `BeaconMeta` column.
//
// We use `repeat_byte` because it's a const fn.
pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0);
pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1);
pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64);
impl StoreItem for SchemaVersion {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(SchemaVersion(u64::from_ssz_bytes(bytes)?))
}
}