Remove DB migrations for legacy database schemas (#3181)

## Proposed Changes

Remove support for DB migrations that support upgrading from schema's below version 5. This is mostly for cosmetic/code quality reasons as in most circumstances upgrading from versions of Lighthouse this old will almost always require a re-sync.

## Additional Info

The minimum supported database schema is now version 5.
This commit is contained in:
Mac L 2022-05-17 04:54:39 +00:00
parent db8a6f81ea
commit def9bc660e
5 changed files with 23 additions and 323 deletions

View File

@ -91,7 +91,7 @@ pub enum BeaconChainError {
BlockSignatureVerifierError(state_processing::block_signature_verifier::Error), BlockSignatureVerifierError(state_processing::block_signature_verifier::Error),
BlockReplayError(BlockReplayError), BlockReplayError(BlockReplayError),
DuplicateValidatorPublicKey, DuplicateValidatorPublicKey,
ValidatorPubkeyCacheFileError(String), ValidatorPubkeyCacheError(String),
ValidatorIndexUnknown(usize), ValidatorIndexUnknown(usize),
ValidatorPubkeyUnknown(PublicKeyBytes), ValidatorPubkeyUnknown(PublicKeyBytes),
OpPoolError(OpPoolError), OpPoolError(OpPoolError),

View File

@ -5,22 +5,14 @@ mod migration_schema_v8;
mod migration_schema_v9; mod migration_schema_v9;
mod types; mod types;
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}; use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY};
use crate::persisted_fork_choice::{PersistedForkChoiceV1, PersistedForkChoiceV7}; use crate::persisted_fork_choice::{PersistedForkChoiceV1, PersistedForkChoiceV7};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use operation_pool::{PersistedOperationPool, PersistedOperationPoolBase};
use slog::{warn, Logger}; use slog::{warn, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::fs;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use store::config::OnDiskStoreConfig;
use store::hot_cold_store::{HotColdDB, HotColdDBError}; use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION}; use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::{DBColumn, Error as StoreError, ItemStore, StoreItem}; use store::{Error as StoreError, StoreItem};
const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz";
/// Migrate the database from one schema version to another, applying all requisite mutations. /// Migrate the database from one schema version to another, applying all requisite mutations.
pub fn migrate_schema<T: BeaconChainTypes>( pub fn migrate_schema<T: BeaconChainTypes>(
@ -39,69 +31,11 @@ pub fn migrate_schema<T: BeaconChainTypes>(
migrate_schema::<T>(db.clone(), datadir, from, next, log.clone())?; migrate_schema::<T>(db.clone(), datadir, from, next, log.clone())?;
migrate_schema::<T>(db, datadir, next, to, log) migrate_schema::<T>(db, datadir, next, to, log)
} }
// Migration from v0.3.0 to v0.3.x, adding the temporary states column.
// Nothing actually needs to be done, but once a DB uses v2 it shouldn't go back.
(SchemaVersion(1), SchemaVersion(2)) => {
db.store_schema_version(to)?;
Ok(())
}
// Migration for removing the pubkey cache.
(SchemaVersion(2), SchemaVersion(3)) => {
let pk_cache_path = datadir.join(PUBKEY_CACHE_FILENAME);
// Load from file, store to DB. //
ValidatorPubkeyCache::<T>::load_from_file(&pk_cache_path) // Migrations from before SchemaVersion(5) are deprecated.
.and_then(|cache| ValidatorPubkeyCache::convert(cache, db.clone())) //
.map_err(|e| StoreError::SchemaMigrationError(format!("{:?}", e)))?;
db.store_schema_version(to)?;
// Delete cache file now that keys are stored in the DB.
fs::remove_file(&pk_cache_path).map_err(|e| {
StoreError::SchemaMigrationError(format!(
"unable to delete {}: {:?}",
pk_cache_path.display(),
e
))
})?;
Ok(())
}
// Migration for adding sync committee contributions to the persisted op pool.
(SchemaVersion(3), SchemaVersion(4)) => {
// Deserialize from what exists in the database using the `PersistedOperationPoolBase`
// variant and convert it to the Altair variant.
let pool_opt = db
.get_item::<PersistedOperationPoolBase<T::EthSpec>>(&OP_POOL_DB_KEY)?
.map(PersistedOperationPool::Base)
.map(PersistedOperationPool::base_to_altair);
if let Some(pool) = pool_opt {
// Store the converted pool under the same key.
db.put_item::<PersistedOperationPool<T::EthSpec>>(&OP_POOL_DB_KEY, &pool)?;
}
db.store_schema_version(to)?;
Ok(())
}
// Migration for weak subjectivity sync support and clean up of `OnDiskStoreConfig` (#1784).
(SchemaVersion(4), SchemaVersion(5)) => {
if let Some(OnDiskStoreConfigV4 {
slots_per_restore_point,
..
}) = db.hot_db.get(&CONFIG_KEY)?
{
let new_config = OnDiskStoreConfig {
slots_per_restore_point,
};
db.hot_db.put(&CONFIG_KEY, &new_config)?;
}
db.store_schema_version(to)?;
Ok(())
}
// Migration for adding `execution_status` field to the fork choice store. // Migration for adding `execution_status` field to the fork choice store.
(SchemaVersion(5), SchemaVersion(6)) => { (SchemaVersion(5), SchemaVersion(6)) => {
// Database operations to be done atomically // Database operations to be done atomically
@ -201,24 +135,3 @@ pub fn migrate_schema<T: BeaconChainTypes>(
.into()), .into()),
} }
} }
// Store config used in v4 schema and earlier.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct OnDiskStoreConfigV4 {
pub slots_per_restore_point: u64,
pub _block_cache_size: usize,
}
impl StoreItem for OnDiskStoreConfigV4 {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

View File

@ -1,11 +1,8 @@
use crate::errors::BeaconChainError; use crate::errors::BeaconChainError;
use crate::{BeaconChainTypes, BeaconStore}; use crate::{BeaconChainTypes, BeaconStore};
use ssz::{Decode, DecodeError, Encode}; use ssz::{Decode, Encode};
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryInto; use std::convert::TryInto;
use std::fs::File;
use std::io::{self, Read, Write};
use std::path::Path;
use store::{DBColumn, Error as StoreError, StoreItem}; use store::{DBColumn, Error as StoreError, StoreItem};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
@ -24,15 +21,7 @@ pub struct ValidatorPubkeyCache<T: BeaconChainTypes> {
pubkeys: Vec<PublicKey>, pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>, indices: HashMap<PublicKeyBytes, usize>,
pubkey_bytes: Vec<PublicKeyBytes>, pubkey_bytes: Vec<PublicKeyBytes>,
backing: PubkeyCacheBacking<T>, store: BeaconStore<T>,
}
/// Abstraction over on-disk backing.
///
/// `File` backing is legacy, `Database` is current.
enum PubkeyCacheBacking<T: BeaconChainTypes> {
File(ValidatorPubkeyCacheFile),
Database(BeaconStore<T>),
} }
impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> { impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
@ -48,7 +37,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pubkeys: vec![], pubkeys: vec![],
indices: HashMap::new(), indices: HashMap::new(),
pubkey_bytes: vec![], pubkey_bytes: vec![],
backing: PubkeyCacheBacking::Database(store), store,
}; };
cache.import_new_pubkeys(state)?; cache.import_new_pubkeys(state)?;
@ -66,7 +55,9 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
if let Some(DatabasePubkey(pubkey)) = if let Some(DatabasePubkey(pubkey)) =
store.get_item(&DatabasePubkey::key_for_index(validator_index))? store.get_item(&DatabasePubkey::key_for_index(validator_index))?
{ {
pubkeys.push((&pubkey).try_into().map_err(Error::PubkeyDecode)?); pubkeys.push((&pubkey).try_into().map_err(|e| {
BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e))
})?);
pubkey_bytes.push(pubkey); pubkey_bytes.push(pubkey);
indices.insert(pubkey, validator_index); indices.insert(pubkey, validator_index);
} else { } else {
@ -78,31 +69,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pubkeys, pubkeys,
indices, indices,
pubkey_bytes, pubkey_bytes,
backing: PubkeyCacheBacking::Database(store), store,
}) })
} }
/// DEPRECATED: used only for migration
pub fn load_from_file<P: AsRef<Path>>(path: P) -> Result<Self, BeaconChainError> {
ValidatorPubkeyCacheFile::open(&path)
.and_then(ValidatorPubkeyCacheFile::into_cache)
.map_err(Into::into)
}
/// Convert a cache using `File` backing to one using `Database` backing.
///
/// This will write all of the keys from `existing_cache` to `store`.
pub fn convert(existing_cache: Self, store: BeaconStore<T>) -> Result<Self, BeaconChainError> {
let mut result = ValidatorPubkeyCache {
pubkeys: Vec::with_capacity(existing_cache.pubkeys.len()),
indices: HashMap::with_capacity(existing_cache.indices.len()),
pubkey_bytes: Vec::with_capacity(existing_cache.indices.len()),
backing: PubkeyCacheBacking::Database(store),
};
result.import(existing_cache.pubkeys.iter().map(PublicKeyBytes::from))?;
Ok(result)
}
/// Scan the given `state` and add any new validator public keys. /// Scan the given `state` and add any new validator public keys.
/// ///
/// Does not delete any keys from `self` if they don't appear in `state`. /// Does not delete any keys from `self` if they don't appear in `state`.
@ -146,14 +116,8 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
// The motivation behind this ordering is that we do not want to have states that // The motivation behind this ordering is that we do not want to have states that
// reference a pubkey that is not in our cache. However, it's fine to have pubkeys // reference a pubkey that is not in our cache. However, it's fine to have pubkeys
// that are never referenced in a state. // that are never referenced in a state.
match &mut self.backing { self.store
PubkeyCacheBacking::File(persistence_file) => { .put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?;
persistence_file.append(i, &pubkey)?;
}
PubkeyCacheBacking::Database(store) => {
store.put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?;
}
}
self.pubkeys.push( self.pubkeys.push(
(&pubkey) (&pubkey)
@ -219,105 +183,6 @@ impl DatabasePubkey {
} }
} }
/// Allows for maintaining an on-disk copy of the `ValidatorPubkeyCache`. The file is raw SSZ bytes
/// (not ASCII encoded).
///
/// ## Writes
///
/// Each entry is simply appended to the file.
///
/// ## Reads
///
/// The whole file is parsed as an SSZ "variable list" of objects.
///
/// This parsing method is possible because the items in the list are fixed-length SSZ objects.
struct ValidatorPubkeyCacheFile(File);
#[derive(Debug)]
enum Error {
Io(io::Error),
Ssz(DecodeError),
PubkeyDecode(bls::Error),
/// The file read from disk does not have a contiguous list of validator public keys. The file
/// has become corrupted.
InconsistentIndex {
_expected: Option<usize>,
_found: usize,
},
}
impl From<Error> for BeaconChainError {
fn from(e: Error) -> BeaconChainError {
BeaconChainError::ValidatorPubkeyCacheFileError(format!("{:?}", e))
}
}
impl ValidatorPubkeyCacheFile {
/// Opens an existing file for reading and writing.
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
File::options()
.read(true)
.write(true)
.create(false)
.append(true)
.open(path)
.map(Self)
.map_err(Error::Io)
}
/// Append a public key to file.
///
/// The provided `index` should each be one greater than the previous and start at 0.
/// Otherwise, the file will become corrupted and unable to be converted into a cache .
pub fn append(&mut self, index: usize, pubkey: &PublicKeyBytes) -> Result<(), Error> {
append_to_file(&mut self.0, index, pubkey)
}
/// Creates a `ValidatorPubkeyCache` by reading and parsing the underlying file.
pub fn into_cache<T: BeaconChainTypes>(mut self) -> Result<ValidatorPubkeyCache<T>, Error> {
let mut bytes = vec![];
self.0.read_to_end(&mut bytes).map_err(Error::Io)?;
let list: Vec<(usize, PublicKeyBytes)> = Vec::from_ssz_bytes(&bytes).map_err(Error::Ssz)?;
let mut last = None;
let mut pubkeys = Vec::with_capacity(list.len());
let mut indices = HashMap::with_capacity(list.len());
let mut pubkey_bytes = Vec::with_capacity(list.len());
for (index, pubkey) in list {
let expected = last.map(|n| n + 1);
if expected.map_or(true, |expected| index == expected) {
last = Some(index);
pubkeys.push((&pubkey).try_into().map_err(Error::PubkeyDecode)?);
pubkey_bytes.push(pubkey);
indices.insert(pubkey, index);
} else {
return Err(Error::InconsistentIndex {
_expected: expected,
_found: index,
});
}
}
Ok(ValidatorPubkeyCache {
pubkeys,
indices,
pubkey_bytes,
backing: PubkeyCacheBacking::File(self),
})
}
}
fn append_to_file(file: &mut File, index: usize, pubkey: &PublicKeyBytes) -> Result<(), Error> {
let mut line = Vec::with_capacity(index.ssz_bytes_len() + pubkey.ssz_bytes_len());
index.ssz_append(&mut line);
pubkey.ssz_append(&mut line);
file.write_all(&line).map_err(Error::Io)
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@ -325,10 +190,7 @@ mod test {
use logging::test_logger; use logging::test_logger;
use std::sync::Arc; use std::sync::Arc;
use store::HotColdDB; use store::HotColdDB;
use tempfile::tempdir; use types::{BeaconState, EthSpec, Keypair, MainnetEthSpec};
use types::{
test_utils::generate_deterministic_keypair, BeaconState, EthSpec, Keypair, MainnetEthSpec,
};
type E = MainnetEthSpec; type E = MainnetEthSpec;
type T = EphemeralHarnessType<E>; type T = EphemeralHarnessType<E>;
@ -422,7 +284,7 @@ mod test {
check_cache_get(&cache, &keypairs[..]); check_cache_get(&cache, &keypairs[..]);
drop(cache); drop(cache);
// Re-init the cache from the file. // Re-init the cache from the store.
let mut cache = let mut cache =
ValidatorPubkeyCache::load_from_store(store.clone()).expect("should open cache"); ValidatorPubkeyCache::load_from_store(store.clone()).expect("should open cache");
check_cache_get(&cache, &keypairs[..]); check_cache_get(&cache, &keypairs[..]);
@ -435,36 +297,8 @@ mod test {
check_cache_get(&cache, &keypairs[..]); check_cache_get(&cache, &keypairs[..]);
drop(cache); drop(cache);
// Re-init the cache from the file. // Re-init the cache from the store.
let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache"); let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache");
check_cache_get(&cache, &keypairs[..]); check_cache_get(&cache, &keypairs[..]);
} }
#[test]
fn invalid_persisted_file() {
let dir = tempdir().expect("should create tempdir");
let path = dir.path().join("cache.ssz");
let pubkey = generate_deterministic_keypair(0).pk.into();
let mut file = File::create(&path).expect("should create file");
append_to_file(&mut file, 0, &pubkey).expect("should write to file");
drop(file);
let cache = ValidatorPubkeyCache::<T>::load_from_file(&path).expect("should open cache");
drop(cache);
let mut file = File::options()
.write(true)
.append(true)
.open(&path)
.expect("should open file");
append_to_file(&mut file, 42, &pubkey).expect("should write bad data to file");
drop(file);
assert!(
ValidatorPubkeyCache::<T>::load_from_file(&path).is_err(),
"should not parse invalid file"
);
}
} }

View File

@ -8,9 +8,7 @@ mod sync_aggregate_id;
pub use attestation::AttMaxCover; pub use attestation::AttMaxCover;
pub use max_cover::MaxCover; pub use max_cover::MaxCover;
pub use persistence::{ pub use persistence::{PersistedOperationPool, PersistedOperationPoolAltair};
PersistedOperationPool, PersistedOperationPoolAltair, PersistedOperationPoolBase,
};
use crate::sync_aggregate_id::SyncAggregateId; use crate::sync_aggregate_id::SyncAggregateId;
use attestation_id::AttestationId; use attestation_id::AttestationId;

View File

@ -17,7 +17,7 @@ type PersistedSyncContributions<T> = Vec<(SyncAggregateId, Vec<SyncCommitteeCont
/// Operations are stored in arbitrary order, so it's not a good idea to compare instances /// Operations are stored in arbitrary order, so it's not a good idea to compare instances
/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first. /// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first.
#[superstruct( #[superstruct(
variants(Base, Altair), variants(Altair),
variant_attributes( variant_attributes(
derive(Derivative, PartialEq, Debug, Serialize, Deserialize, Encode, Decode), derive(Derivative, PartialEq, Debug, Serialize, Deserialize, Encode, Decode),
serde(bound = "T: EthSpec", deny_unknown_fields), serde(bound = "T: EthSpec", deny_unknown_fields),
@ -46,9 +46,7 @@ pub struct PersistedOperationPool<T: EthSpec> {
} }
impl<T: EthSpec> PersistedOperationPool<T> { impl<T: EthSpec> PersistedOperationPool<T> {
/// Convert an `OperationPool` into serializable form. Always converts to /// Convert an `OperationPool` into serializable form.
/// `PersistedOperationPool::Altair` because the v3 to v4 database schema migration ensures
/// the op pool is always persisted as the Altair variant.
pub fn from_operation_pool(operation_pool: &OperationPool<T>) -> Self { pub fn from_operation_pool(operation_pool: &OperationPool<T>) -> Self {
let attestations = operation_pool let attestations = operation_pool
.attestations .attestations
@ -114,14 +112,6 @@ impl<T: EthSpec> PersistedOperationPool<T> {
.collect(), .collect(),
); );
let op_pool = match self { let op_pool = match self {
PersistedOperationPool::Base(_) => OperationPool {
attestations,
sync_contributions: <_>::default(),
attester_slashings,
proposer_slashings,
voluntary_exits,
_phantom: Default::default(),
},
PersistedOperationPool::Altair(_) => { PersistedOperationPool::Altair(_) => {
let sync_contributions = let sync_contributions =
RwLock::new(self.sync_contributions()?.iter().cloned().collect()); RwLock::new(self.sync_contributions()?.iter().cloned().collect());
@ -138,44 +128,9 @@ impl<T: EthSpec> PersistedOperationPool<T> {
}; };
Ok(op_pool) Ok(op_pool)
} }
/// Convert the `PersistedOperationPool::Base` variant to `PersistedOperationPool::Altair` by
/// setting `sync_contributions` to its default.
pub fn base_to_altair(self) -> Self {
match self {
PersistedOperationPool::Base(_) => {
PersistedOperationPool::Altair(PersistedOperationPoolAltair {
attestations: self.attestations().to_vec(),
sync_contributions: <_>::default(),
attester_slashings: self.attester_slashings().to_vec(),
proposer_slashings: self.proposer_slashings().to_vec(),
voluntary_exits: self.voluntary_exits().to_vec(),
})
}
PersistedOperationPool::Altair(_) => self,
}
}
} }
/// This `StoreItem` implementation is necessary for migrating the `PersistedOperationPool` /// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair`.
/// in the v3 to v4 database schema migration.
impl<T: EthSpec> StoreItem for PersistedOperationPoolBase<T> {
fn db_column() -> DBColumn {
DBColumn::OpPool
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Self::from_ssz_bytes(bytes).map_err(Into::into)
}
}
/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair`
/// because the v3 to v4 database schema migration ensures the persisted op pool is always stored
/// in the Altair format.
impl<T: EthSpec> StoreItem for PersistedOperationPool<T> { impl<T: EthSpec> StoreItem for PersistedOperationPool<T> {
fn db_column() -> DBColumn { fn db_column() -> DBColumn {
DBColumn::OpPool DBColumn::OpPool