diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 494547115..553c4f78a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -175,6 +175,14 @@ pub type BeaconForkChoice = ForkChoice< ::EthSpec, >; +pub type BeaconStore = Arc< + HotColdDB< + ::EthSpec, + ::HotStore, + ::ColdStore, + >, +>; + /// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block /// operations and chooses a canonical head. pub struct BeaconChain { @@ -182,7 +190,7 @@ pub struct BeaconChain { /// Configuration for `BeaconChain` runtime behaviour. pub config: ChainConfig, /// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB. - pub store: Arc>, + pub store: BeaconStore, /// Database migrator for running background maintenance on the store. pub store_migrator: BackgroundMigrator, /// Reports the current slot, typically based upon the system clock. @@ -237,7 +245,7 @@ pub struct BeaconChain { /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. pub(crate) beacon_proposer_cache: Mutex, /// Caches a map of `validator_index -> validator_pubkey`. - pub(crate) validator_pubkey_cache: TimeoutRwLock, + pub(crate) validator_pubkey_cache: TimeoutRwLock>, /// A list of any hard-coded forks that have been disabled. pub disabled_forks: Vec, /// Sender given to tasks, so that if they encounter a state in which execution cannot @@ -300,9 +308,7 @@ impl BeaconChain { } /// Load fork choice from disk, returning `None` if it isn't found. - pub fn load_fork_choice( - store: Arc>, - ) -> Result>, Error> { + pub fn load_fork_choice(store: BeaconStore) -> Result>, Error> { let persisted_fork_choice = match store.get_item::(&FORK_CHOICE_DB_KEY)? { Some(fc) => fc, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index f6fba1409..4433448d7 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1336,7 +1336,7 @@ fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec>( /// Obtains a read-locked `ValidatorPubkeyCache` from the `chain`. fn get_validator_pubkey_cache( chain: &BeaconChain, -) -> Result, BlockError> { +) -> Result>, BlockError> { chain .validator_pubkey_cache .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) @@ -1348,11 +1348,11 @@ fn get_validator_pubkey_cache( /// /// The signature verifier is empty because it does not yet have any of this block's signatures /// added to it. Use `Self::apply_to_signature_verifier` to apply the signatures. -fn get_signature_verifier<'a, E: EthSpec>( - state: &'a BeaconState, - validator_pubkey_cache: &'a ValidatorPubkeyCache, +fn get_signature_verifier<'a, T: BeaconChainTypes>( + state: &'a BeaconState, + validator_pubkey_cache: &'a ValidatorPubkeyCache, spec: &'a ChainSpec, -) -> BlockSignatureVerifier<'a, E, impl Fn(usize) -> Option> + Clone> { +) -> BlockSignatureVerifier<'a, T::EthSpec, impl Fn(usize) -> Option> + Clone> { BlockSignatureVerifier::new( state, move |validator_index| { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index d63c28c01..238db2b89 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -31,8 +31,6 @@ use types::{ SignedBeaconBlock, Slot, }; -pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; - /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing /// functionality and only exists to satisfy the type system. pub struct Witness( @@ -81,8 +79,7 @@ pub struct BeaconChainBuilder { shutdown_sender: Option>, head_tracker: Option, data_dir: Option, - pubkey_cache_path: Option, - validator_pubkey_cache: Option, + validator_pubkey_cache: Option>, spec: ChainSpec, chain_config: ChainConfig, disabled_forks: Vec, @@ -119,7 +116,6 @@ where slot_clock: None, shutdown_sender: None, head_tracker: None, - pubkey_cache_path: None, data_dir: None, disabled_forks: Vec::new(), validator_pubkey_cache: None, @@ -182,7 +178,6 @@ where /// /// Should generally be called early in the build chain. pub fn data_dir(mut self, path: PathBuf) -> Self { - self.pubkey_cache_path = Some(path.join(PUBKEY_CACHE_FILENAME)); self.data_dir = Some(path); self } @@ -224,11 +219,6 @@ where pub fn resume_from_db(mut self) -> Result { let log = self.log.as_ref().ok_or("resume_from_db requires a log")?; - let pubkey_cache_path = self - .pubkey_cache_path - .as_ref() - .ok_or("resume_from_db requires a data_dir")?; - info!( log, "Starting beacon chain"; @@ -274,7 +264,7 @@ where .unwrap_or_else(OperationPool::new), ); - let pubkey_cache = ValidatorPubkeyCache::load_from_file(pubkey_cache_path) + let pubkey_cache = ValidatorPubkeyCache::load_from_store(store) .map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?; self.genesis_block_root = Some(chain.genesis_block_root); @@ -496,12 +486,8 @@ where } } - let pubkey_cache_path = self - .pubkey_cache_path - .ok_or("Cannot build without a pubkey cache path")?; - let validator_pubkey_cache = self.validator_pubkey_cache.map(Ok).unwrap_or_else(|| { - ValidatorPubkeyCache::new(&canonical_head.beacon_state, pubkey_cache_path) + ValidatorPubkeyCache::new(&canonical_head.beacon_state, store.clone()) .map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e)) })?; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index f0b941a49..27f999d30 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -20,6 +20,7 @@ mod observed_block_producers; pub mod observed_operations; mod persisted_beacon_chain; mod persisted_fork_choice; +pub mod schema_change; mod shuffling_cache; mod snapshot_cache; pub mod state_advance_timer; @@ -29,7 +30,7 @@ pub mod validator_monitor; mod validator_pubkey_cache; pub use self::beacon_chain::{ - AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, ChainSegmentResult, + AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, ForkChoiceError, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs new file mode 100644 index 000000000..ca3e6efbb --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -0,0 +1,64 @@ +//! Utilities for managing database schema changes. +use crate::beacon_chain::BeaconChainTypes; +use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use std::fs; +use std::path::Path; +use std::sync::Arc; +use store::hot_cold_store::{HotColdDB, HotColdDBError}; +use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}; +use store::Error as StoreError; + +const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; + +/// Migrate the database from one schema version to another, applying all requisite mutations. +pub fn migrate_schema( + db: Arc>, + datadir: &Path, + from: SchemaVersion, + to: SchemaVersion, +) -> Result<(), StoreError> { + match (from, to) { + // Migrating from the current schema version to iself is always OK, a no-op. + (_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()), + // Migrate across multiple versions by recursively migrating one step at a time. + (_, _) if from.as_u64() + 1 < to.as_u64() => { + let next = SchemaVersion(from.as_u64() + 1); + migrate_schema::(db.clone(), datadir, from, next)?; + migrate_schema::(db, datadir, next, to) + } + // Migration from v0.3.0 to v0.3.x, adding the temporary states column. + // Nothing actually needs to be done, but once a DB uses v2 it shouldn't go back. + (SchemaVersion(1), SchemaVersion(2)) => { + db.store_schema_version(to)?; + Ok(()) + } + // Migration for removing the pubkey cache. + (SchemaVersion(2), SchemaVersion(3)) => { + let pk_cache_path = datadir.join(PUBKEY_CACHE_FILENAME); + + // Load from file, store to DB. + ValidatorPubkeyCache::::load_from_file(&pk_cache_path) + .and_then(|cache| ValidatorPubkeyCache::convert(cache, db.clone())) + .map_err(|e| StoreError::SchemaMigrationError(format!("{:?}", e)))?; + + db.store_schema_version(to)?; + + // Delete cache file now that keys are stored in the DB. + fs::remove_file(&pk_cache_path).map_err(|e| { + StoreError::SchemaMigrationError(format!( + "unable to delete {}: {:?}", + pk_cache_path.display(), + e + )) + })?; + + Ok(()) + } + // Anything else is an error. + (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { + target_version: to, + current_version: from, + } + .into()), + } +} diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index e72a89ce7..28bcbf389 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -1,11 +1,13 @@ use crate::errors::BeaconChainError; +use crate::{BeaconChainTypes, BeaconStore}; use ssz::{Decode, DecodeError, Encode}; use std::collections::HashMap; use std::convert::TryInto; use std::fs::{File, OpenOptions}; use std::io::{self, Read, Write}; use std::path::Path; -use types::{BeaconState, EthSpec, PublicKey, PublicKeyBytes, Validator}; +use store::{DBColumn, Error as StoreError, StoreItem}; +use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; /// Provides a mapping of `validator_index -> validator_publickey`. /// @@ -16,33 +18,35 @@ use types::{BeaconState, EthSpec, PublicKey, PublicKeyBytes, Validator}; /// keys in compressed form and they are needed in decompressed form for signature verification. /// Decompression is expensive when many keys are involved. /// -/// The cache has a `persistence_file` that it uses to maintain a persistent, on-disk +/// The cache has a `backing` that it uses to maintain a persistent, on-disk /// copy of itself. This allows it to be restored between process invocations. -pub struct ValidatorPubkeyCache { +pub struct ValidatorPubkeyCache { pubkeys: Vec, indices: HashMap, - persitence_file: ValidatorPubkeyCacheFile, + backing: PubkeyCacheBacking, } -impl ValidatorPubkeyCache { - pub fn load_from_file>(path: P) -> Result { - ValidatorPubkeyCacheFile::open(&path) - .and_then(ValidatorPubkeyCacheFile::into_cache) - .map_err(Into::into) - } +/// Abstraction over on-disk backing. +/// +/// `File` backing is legacy, `Database` is current. +enum PubkeyCacheBacking { + File(ValidatorPubkeyCacheFile), + Database(BeaconStore), +} +impl ValidatorPubkeyCache { /// Create a new public key cache using the keys in `state.validators`. /// /// Also creates a new persistence file, returning an error if there is already a file at /// `persistence_path`. - pub fn new>( - state: &BeaconState, - persistence_path: P, + pub fn new( + state: &BeaconState, + store: BeaconStore, ) -> Result { let mut cache = Self { - persitence_file: ValidatorPubkeyCacheFile::create(persistence_path)?, pubkeys: vec![], indices: HashMap::new(), + backing: PubkeyCacheBacking::Database(store), }; cache.import_new_pubkeys(state)?; @@ -50,33 +54,83 @@ impl ValidatorPubkeyCache { Ok(cache) } + /// Load the pubkey cache from the given on-disk database. + pub fn load_from_store(store: BeaconStore) -> Result { + let mut pubkeys = vec![]; + let mut indices = HashMap::new(); + + for validator_index in 0.. { + if let Some(DatabasePubkey(pubkey)) = + store.get_item(&DatabasePubkey::key_for_index(validator_index))? + { + pubkeys.push((&pubkey).try_into().map_err(Error::PubkeyDecode)?); + indices.insert(pubkey, validator_index); + } else { + break; + } + } + + Ok(ValidatorPubkeyCache { + pubkeys, + indices, + backing: PubkeyCacheBacking::Database(store), + }) + } + + /// DEPRECATED: used only for migration + pub fn load_from_file>(path: P) -> Result { + ValidatorPubkeyCacheFile::open(&path) + .and_then(ValidatorPubkeyCacheFile::into_cache) + .map_err(Into::into) + } + + /// Convert a cache using `File` backing to one using `Database` backing. + /// + /// This will write all of the keys from `existing_cache` to `store`. + pub fn convert(existing_cache: Self, store: BeaconStore) -> Result { + let mut result = ValidatorPubkeyCache { + pubkeys: Vec::with_capacity(existing_cache.pubkeys.len()), + indices: HashMap::with_capacity(existing_cache.indices.len()), + backing: PubkeyCacheBacking::Database(store), + }; + result.import(existing_cache.pubkeys.iter().map(PublicKeyBytes::from))?; + Ok(result) + } + /// Scan the given `state` and add any new validator public keys. /// /// Does not delete any keys from `self` if they don't appear in `state`. - pub fn import_new_pubkeys( + pub fn import_new_pubkeys( &mut self, - state: &BeaconState, + state: &BeaconState, ) -> Result<(), BeaconChainError> { if state.validators.len() > self.pubkeys.len() { - self.import(&state.validators[self.pubkeys.len()..]) + self.import( + state.validators[self.pubkeys.len()..] + .iter() + .map(|v| v.pubkey), + ) } else { Ok(()) } } /// Adds zero or more validators to `self`. - fn import(&mut self, validators: &[Validator]) -> Result<(), BeaconChainError> { - self.pubkeys.reserve(validators.len()); - self.indices.reserve(validators.len()); + fn import(&mut self, validator_keys: I) -> Result<(), BeaconChainError> + where + I: Iterator + ExactSizeIterator, + { + self.pubkeys.reserve(validator_keys.len()); + self.indices.reserve(validator_keys.len()); - for v in validators.iter() { + for pubkey in validator_keys { let i = self.pubkeys.len(); - if self.indices.contains_key(&v.pubkey) { + if self.indices.contains_key(&pubkey) { return Err(BeaconChainError::DuplicateValidatorPublicKey); } - // The item is written to disk (the persistence file) _before_ it is written into + // The item is written to disk _before_ it is written into // the local struct. // // This means that a pubkey cache read from disk will always be equivalent to or @@ -85,15 +139,22 @@ impl ValidatorPubkeyCache { // The motivation behind this ordering is that we do not want to have states that // reference a pubkey that is not in our cache. However, it's fine to have pubkeys // that are never referenced in a state. - self.persitence_file.append(i, &v.pubkey)?; + match &mut self.backing { + PubkeyCacheBacking::File(persistence_file) => { + persistence_file.append(i, &pubkey)?; + } + PubkeyCacheBacking::Database(store) => { + store.put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?; + } + } self.pubkeys.push( - (&v.pubkey) + (&pubkey) .try_into() .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?, ); - self.indices.insert(v.pubkey, i); + self.indices.insert(pubkey, i); } Ok(()) @@ -115,6 +176,31 @@ impl ValidatorPubkeyCache { } } +/// Wrapper for a public key stored in the database. +/// +/// Keyed by the validator index as `Hash256::from_low_u64_be(index)`. +struct DatabasePubkey(PublicKeyBytes); + +impl StoreItem for DatabasePubkey { + fn db_column() -> DBColumn { + DBColumn::PubkeyCache + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self(PublicKeyBytes::from_ssz_bytes(bytes)?)) + } +} + +impl DatabasePubkey { + fn key_for_index(index: usize) -> Hash256 { + Hash256::from_low_u64_be(index as u64) + } +} + /// Allows for maintaining an on-disk copy of the `ValidatorPubkeyCache`. The file is raw SSZ bytes /// (not ASCII encoded). /// @@ -149,17 +235,6 @@ impl From for BeaconChainError { } impl ValidatorPubkeyCacheFile { - /// Creates a file for reading and writing. - pub fn create>(path: P) -> Result { - OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(path) - .map(Self) - .map_err(Error::Io) - } - /// Opens an existing file for reading and writing. pub fn open>(path: P) -> Result { OpenOptions::new() @@ -181,7 +256,7 @@ impl ValidatorPubkeyCacheFile { } /// Creates a `ValidatorPubkeyCache` by reading and parsing the underlying file. - pub fn into_cache(mut self) -> Result { + pub fn into_cache(mut self) -> Result, Error> { let mut bytes = vec![]; self.0.read_to_end(&mut bytes).map_err(Error::Io)?; @@ -208,7 +283,7 @@ impl ValidatorPubkeyCacheFile { Ok(ValidatorPubkeyCache { pubkeys, indices, - persitence_file: self, + backing: PubkeyCacheBacking::File(self), }) } } @@ -225,21 +300,33 @@ fn append_to_file(file: &mut File, index: usize, pubkey: &PublicKeyBytes) -> Res #[cfg(test)] mod test { use super::*; + use crate::test_utils::{test_logger, EphemeralHarnessType}; + use std::sync::Arc; + use store::HotColdDB; use tempfile::tempdir; use types::{ test_utils::{generate_deterministic_keypair, TestingBeaconStateBuilder}, BeaconState, EthSpec, Keypair, MainnetEthSpec, }; - fn get_state(validator_count: usize) -> (BeaconState, Vec) { - let spec = MainnetEthSpec::default_spec(); + type E = MainnetEthSpec; + type T = EphemeralHarnessType; + + fn get_state(validator_count: usize) -> (BeaconState, Vec) { + let spec = E::default_spec(); let builder = TestingBeaconStateBuilder::from_deterministic_keypairs(validator_count, &spec); builder.build() } + fn get_store() -> BeaconStore { + Arc::new( + HotColdDB::open_ephemeral(<_>::default(), E::default_spec(), test_logger()).unwrap(), + ) + } + #[allow(clippy::needless_range_loop)] - fn check_cache_get(cache: &ValidatorPubkeyCache, keypairs: &[Keypair]) { + fn check_cache_get(cache: &ValidatorPubkeyCache, keypairs: &[Keypair]) { let validator_count = keypairs.len(); for i in 0..validator_count + 1 { @@ -270,10 +357,9 @@ mod test { fn basic_operation() { let (state, keypairs) = get_state(8); - let dir = tempdir().expect("should create tempdir"); - let path = dir.path().join("cache.ssz"); + let store = get_store(); - let mut cache = ValidatorPubkeyCache::new(&state, path).expect("should create cache"); + let mut cache = ValidatorPubkeyCache::new(&state, store).expect("should create cache"); check_cache_get(&cache, &keypairs[..]); @@ -303,16 +389,16 @@ mod test { fn persistence() { let (state, keypairs) = get_state(8); - let dir = tempdir().expect("should create tempdir"); - let path = dir.path().join("cache.ssz"); + let store = get_store(); // Create a new cache. - let cache = ValidatorPubkeyCache::new(&state, &path).expect("should create cache"); + let cache = ValidatorPubkeyCache::new(&state, store.clone()).expect("should create cache"); check_cache_get(&cache, &keypairs[..]); drop(cache); // Re-init the cache from the file. - let mut cache = ValidatorPubkeyCache::load_from_file(&path).expect("should open cache"); + let mut cache = + ValidatorPubkeyCache::load_from_store(store.clone()).expect("should open cache"); check_cache_get(&cache, &keypairs[..]); // Add some more keypairs. @@ -324,7 +410,7 @@ mod test { drop(cache); // Re-init the cache from the file. - let cache = ValidatorPubkeyCache::load_from_file(&path).expect("should open cache"); + let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache"); check_cache_get(&cache, &keypairs[..]); } @@ -338,7 +424,7 @@ mod test { append_to_file(&mut file, 0, &pubkey).expect("should write to file"); drop(file); - let cache = ValidatorPubkeyCache::load_from_file(&path).expect("should open cache"); + let cache = ValidatorPubkeyCache::::load_from_file(&path).expect("should open cache"); drop(cache); let mut file = OpenOptions::new() @@ -351,7 +437,7 @@ mod test { drop(file); assert!( - ValidatorPubkeyCache::load_from_file(&path).is_err(), + ValidatorPubkeyCache::::load_from_file(&path).is_err(), "should not parse invalid file" ); } diff --git a/beacon_node/beacon_chain/tests/op_verification.rs b/beacon_node/beacon_chain/tests/op_verification.rs index 0f4d5b1a9..8d86d01ce 100644 --- a/beacon_node/beacon_chain/tests/op_verification.rs +++ b/beacon_node/beacon_chain/tests/op_verification.rs @@ -37,10 +37,8 @@ fn get_store(db_path: &TempDir) -> Arc { let cold_path = db_path.path().join("cold_db"); let config = StoreConfig::default(); let log = NullLoggerBuilder.build().expect("logger should build"); - Arc::new( - HotColdDB::open(&hot_path, &cold_path, config, spec, log) - .expect("disk store should initialize"), - ) + HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) + .expect("disk store should initialize") } fn get_harness(store: Arc, validator_count: usize) -> TestHarness { diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index 18ecaaf0b..0f5aa8a6b 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -29,10 +29,8 @@ fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { let cold_path = db_path.path().join("cold_db"); let config = StoreConfig::default(); let log = NullLoggerBuilder.build().expect("logger should build"); - Arc::new( - HotColdDB::open(&hot_path, &cold_path, config, spec, log) - .expect("disk store should initialize"), - ) + HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) + .expect("disk store should initialize") } #[test] diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 63182cc9f..7a7ea49c7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -40,10 +40,8 @@ fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { let config = StoreConfig::default(); let log = test_logger(); - Arc::new( - HotColdDB::open(&hot_path, &cold_path, config, spec, log) - .expect("disk store should initialize"), - ) + HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) + .expect("disk store should initialize") } fn get_harness( diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 200869ec4..a80fde403 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,6 +1,7 @@ use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; +use beacon_chain::schema_change::migrate_schema; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, @@ -544,6 +545,7 @@ where /// Specifies that the `Client` should use a `HotColdDB` database. pub fn disk_store( mut self, + datadir: &Path, hot_path: &Path, cold_path: &Path, config: StoreConfig, @@ -561,9 +563,20 @@ where self.db_path = Some(hot_path.into()); self.freezer_db_path = Some(cold_path.into()); - let store = HotColdDB::open(hot_path, cold_path, config, spec, context.log().clone()) - .map_err(|e| format!("Unable to open database: {:?}", e))?; - self.store = Some(Arc::new(store)); + let schema_upgrade = |db, from, to| { + migrate_schema::>(db, datadir, from, to) + }; + + let store = HotColdDB::open( + hot_path, + cold_path, + schema_upgrade, + config, + spec, + context.log().clone(), + ) + .map_err(|e| format!("Unable to open database: {:?}", e))?; + self.store = Some(store); Ok(self) } } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 24e02f401..32d874673 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1,4 +1,3 @@ -use beacon_chain::builder::PUBKEY_CACHE_FILENAME; use clap::ArgMatches; use clap_utils::BAD_TESTNET_DIR_MESSAGE; use client::{ClientConfig, ClientGenesis}; @@ -45,13 +44,6 @@ pub fn get_config( .ok_or("Failed to get freezer db path")?, ) .map_err(|err| format!("Failed to remove chain_db: {}", err))?; - - // Remove the pubkey cache file if it exists - let pubkey_cache_file = client_config.data_dir.join(PUBKEY_CACHE_FILENAME); - if pubkey_cache_file.exists() { - fs::remove_file(&pubkey_cache_file) - .map_err(|e| format!("Failed to remove {:?}: {:?}", pubkey_cache_file, e))?; - } } // Create `datadir` and any non-existing parent directories. diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 7671c15ea..4c8610b6d 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -58,10 +58,10 @@ impl ProductionBeaconNode { mut client_config: ClientConfig, ) -> Result { let spec = context.eth2_config().spec.clone(); - let client_config_1 = client_config.clone(); let client_genesis = client_config.genesis.clone(); let store_config = client_config.store.clone(); let log = context.log().clone(); + let datadir = client_config.create_data_dir()?; let db_path = client_config.create_db_path()?; let freezer_db_path_res = client_config.create_freezer_db_path(); let executor = context.executor.clone(); @@ -70,7 +70,7 @@ impl ProductionBeaconNode { .runtime_context(context) .chain_spec(spec) .http_api_config(client_config.http_api.clone()) - .disk_store(&db_path, &freezer_db_path_res?, store_config)?; + .disk_store(&datadir, &db_path, &freezer_db_path_res?, store_config)?; let builder = if let Some(slasher_config) = client_config.slasher.clone() { let slasher = Arc::new( @@ -83,7 +83,7 @@ impl ProductionBeaconNode { }; let builder = builder - .beacon_chain_builder(client_genesis, client_config_1) + .beacon_chain_builder(client_genesis, client_config.clone()) .await?; let builder = if client_config.sync_eth1_chain && !client_config.dummy_eth1_backend { info!( diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 622cd2ac7..f943a983e 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -19,6 +19,7 @@ pub enum Error { NoContinuationData, SplitPointModified(Slot, Slot), ConfigError(StoreConfigError), + SchemaMigrationError(String), } impl From for Error { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 31785bdf7..6eb2fe792 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -135,16 +135,20 @@ impl HotColdDB, LevelDB> { /// Open a new or existing database, with the given paths to the hot and cold DBs. /// /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. + /// + /// The `migrate_schema` function is passed in so that the parent `BeaconChain` can provide + /// context and access `BeaconChain`-level code without creating a circular dependency. pub fn open( hot_path: &Path, cold_path: &Path, + migrate_schema: impl FnOnce(Arc, SchemaVersion, SchemaVersion) -> Result<(), Error>, config: StoreConfig, spec: ChainSpec, log: Logger, - ) -> Result, LevelDB>, Error> { + ) -> Result, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; - let db = HotColdDB { + let db = Arc::new(HotColdDB { split: RwLock::new(Split::default()), cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, @@ -153,7 +157,7 @@ impl HotColdDB, LevelDB> { spec, log, _phantom: PhantomData, - }; + }); // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. @@ -164,7 +168,7 @@ impl HotColdDB, LevelDB> { "from_version" => schema_version.as_u64(), "to_version" => CURRENT_SCHEMA_VERSION.as_u64(), ); - db.migrate_schema(schema_version, CURRENT_SCHEMA_VERSION)?; + migrate_schema(db.clone(), schema_version, CURRENT_SCHEMA_VERSION)?; } else { db.store_schema_version(CURRENT_SCHEMA_VERSION)?; } @@ -830,7 +834,7 @@ impl, Cold: ItemStore> HotColdDB } /// Store the database schema version. - pub(crate) fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> { + pub fn store_schema_version(&self, schema_version: SchemaVersion) -> Result<(), Error> { self.hot_db.put(&SCHEMA_VERSION_KEY, &schema_version) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 264f0e94c..a5657ac05 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -20,10 +20,9 @@ pub mod hot_cold_store; mod impls; mod leveldb_store; mod memory_store; -mod metadata; +pub mod metadata; mod metrics; mod partial_beacon_state; -mod schema_change; pub mod iter; @@ -153,6 +152,7 @@ pub enum DBColumn { OpPool, Eth1Cache, ForkChoice, + PubkeyCache, /// For the table mapping restore point numbers to state roots. BeaconRestorePoint, /// For the mapping from state roots to their slots or summaries. @@ -178,6 +178,7 @@ impl Into<&'static str> for DBColumn { DBColumn::OpPool => "opo", DBColumn::Eth1Cache => "etc", DBColumn::ForkChoice => "frk", + DBColumn::PubkeyCache => "pkc", DBColumn::BeaconRestorePoint => "brp", DBColumn::BeaconStateSummary => "bss", DBColumn::BeaconStateTemporary => "bst", diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 3664443a0..45d159c08 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem}; use ssz::{Decode, Encode}; use types::{Checkpoint, Hash256}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(3); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/beacon_node/store/src/schema_change.rs b/beacon_node/store/src/schema_change.rs deleted file mode 100644 index ad89e47be..000000000 --- a/beacon_node/store/src/schema_change.rs +++ /dev/null @@ -1,32 +0,0 @@ -//! Utilities for managing database schema changes. -use crate::hot_cold_store::{HotColdDB, HotColdDBError}; -use crate::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}; -use crate::{Error, ItemStore}; -use types::EthSpec; - -impl HotColdDB -where - E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, -{ - /// Migrate the database from one schema version to another, applying all requisite mutations. - pub fn migrate_schema(&self, from: SchemaVersion, to: SchemaVersion) -> Result<(), Error> { - match (from, to) { - // Migration from v0.3.0 to v0.3.x, adding the temporary states column. - // Nothing actually needs to be done, but once a DB uses v2 it shouldn't go back. - (SchemaVersion(1), SchemaVersion(2)) => { - self.store_schema_version(to)?; - Ok(()) - } - // Migrating from the current schema version to iself is always OK, a no-op. - (_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()), - // Anything else is an error. - (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { - target_version: to, - current_version: from, - } - .into()), - } - } -}