diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index c036dfe45..280ec3fac 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -91,7 +91,7 @@ pub enum BeaconChainError { BlockSignatureVerifierError(state_processing::block_signature_verifier::Error), BlockReplayError(BlockReplayError), DuplicateValidatorPublicKey, - ValidatorPubkeyCacheFileError(String), + ValidatorPubkeyCacheError(String), ValidatorIndexUnknown(usize), ValidatorPubkeyUnknown(PublicKeyBytes), OpPoolError(OpPoolError), diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 83e0cdd11..8fb4f82be 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -5,22 +5,14 @@ mod migration_schema_v8; mod migration_schema_v9; mod types; -use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}; +use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY}; use crate::persisted_fork_choice::{PersistedForkChoiceV1, PersistedForkChoiceV7}; -use crate::validator_pubkey_cache::ValidatorPubkeyCache; -use operation_pool::{PersistedOperationPool, PersistedOperationPoolBase}; use slog::{warn, Logger}; -use ssz::{Decode, Encode}; -use ssz_derive::{Decode, Encode}; -use std::fs; use std::path::Path; use std::sync::Arc; -use store::config::OnDiskStoreConfig; use store::hot_cold_store::{HotColdDB, HotColdDBError}; -use store::metadata::{SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION}; -use store::{DBColumn, Error as StoreError, ItemStore, StoreItem}; - -const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; +use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}; +use store::{Error as StoreError, StoreItem}; /// Migrate the database from one schema version to another, applying all requisite mutations. pub fn migrate_schema( @@ -39,69 +31,11 @@ pub fn migrate_schema( migrate_schema::(db.clone(), datadir, from, next, log.clone())?; migrate_schema::(db, datadir, next, to, log) } - // Migration from v0.3.0 to v0.3.x, adding the temporary states column. - // Nothing actually needs to be done, but once a DB uses v2 it shouldn't go back. - (SchemaVersion(1), SchemaVersion(2)) => { - db.store_schema_version(to)?; - Ok(()) - } - // Migration for removing the pubkey cache. - (SchemaVersion(2), SchemaVersion(3)) => { - let pk_cache_path = datadir.join(PUBKEY_CACHE_FILENAME); - // Load from file, store to DB. - ValidatorPubkeyCache::::load_from_file(&pk_cache_path) - .and_then(|cache| ValidatorPubkeyCache::convert(cache, db.clone())) - .map_err(|e| StoreError::SchemaMigrationError(format!("{:?}", e)))?; + // + // Migrations from before SchemaVersion(5) are deprecated. + // - db.store_schema_version(to)?; - - // Delete cache file now that keys are stored in the DB. - fs::remove_file(&pk_cache_path).map_err(|e| { - StoreError::SchemaMigrationError(format!( - "unable to delete {}: {:?}", - pk_cache_path.display(), - e - )) - })?; - - Ok(()) - } - // Migration for adding sync committee contributions to the persisted op pool. - (SchemaVersion(3), SchemaVersion(4)) => { - // Deserialize from what exists in the database using the `PersistedOperationPoolBase` - // variant and convert it to the Altair variant. - let pool_opt = db - .get_item::>(&OP_POOL_DB_KEY)? - .map(PersistedOperationPool::Base) - .map(PersistedOperationPool::base_to_altair); - - if let Some(pool) = pool_opt { - // Store the converted pool under the same key. - db.put_item::>(&OP_POOL_DB_KEY, &pool)?; - } - - db.store_schema_version(to)?; - - Ok(()) - } - // Migration for weak subjectivity sync support and clean up of `OnDiskStoreConfig` (#1784). - (SchemaVersion(4), SchemaVersion(5)) => { - if let Some(OnDiskStoreConfigV4 { - slots_per_restore_point, - .. - }) = db.hot_db.get(&CONFIG_KEY)? - { - let new_config = OnDiskStoreConfig { - slots_per_restore_point, - }; - db.hot_db.put(&CONFIG_KEY, &new_config)?; - } - - db.store_schema_version(to)?; - - Ok(()) - } // Migration for adding `execution_status` field to the fork choice store. (SchemaVersion(5), SchemaVersion(6)) => { // Database operations to be done atomically @@ -201,24 +135,3 @@ pub fn migrate_schema( .into()), } } - -// Store config used in v4 schema and earlier. -#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] -pub struct OnDiskStoreConfigV4 { - pub slots_per_restore_point: u64, - pub _block_cache_size: usize, -} - -impl StoreItem for OnDiskStoreConfigV4 { - fn db_column() -> DBColumn { - DBColumn::BeaconMeta - } - - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() - } - - fn from_store_bytes(bytes: &[u8]) -> Result { - Ok(Self::from_ssz_bytes(bytes)?) - } -} diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 769d66cd1..beb8da8b6 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -1,11 +1,8 @@ use crate::errors::BeaconChainError; use crate::{BeaconChainTypes, BeaconStore}; -use ssz::{Decode, DecodeError, Encode}; +use ssz::{Decode, Encode}; use std::collections::HashMap; use std::convert::TryInto; -use std::fs::File; -use std::io::{self, Read, Write}; -use std::path::Path; use store::{DBColumn, Error as StoreError, StoreItem}; use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; @@ -24,15 +21,7 @@ pub struct ValidatorPubkeyCache { pubkeys: Vec, indices: HashMap, pubkey_bytes: Vec, - backing: PubkeyCacheBacking, -} - -/// Abstraction over on-disk backing. -/// -/// `File` backing is legacy, `Database` is current. -enum PubkeyCacheBacking { - File(ValidatorPubkeyCacheFile), - Database(BeaconStore), + store: BeaconStore, } impl ValidatorPubkeyCache { @@ -48,7 +37,7 @@ impl ValidatorPubkeyCache { pubkeys: vec![], indices: HashMap::new(), pubkey_bytes: vec![], - backing: PubkeyCacheBacking::Database(store), + store, }; cache.import_new_pubkeys(state)?; @@ -66,7 +55,9 @@ impl ValidatorPubkeyCache { if let Some(DatabasePubkey(pubkey)) = store.get_item(&DatabasePubkey::key_for_index(validator_index))? { - pubkeys.push((&pubkey).try_into().map_err(Error::PubkeyDecode)?); + pubkeys.push((&pubkey).try_into().map_err(|e| { + BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e)) + })?); pubkey_bytes.push(pubkey); indices.insert(pubkey, validator_index); } else { @@ -78,31 +69,10 @@ impl ValidatorPubkeyCache { pubkeys, indices, pubkey_bytes, - backing: PubkeyCacheBacking::Database(store), + store, }) } - /// DEPRECATED: used only for migration - pub fn load_from_file>(path: P) -> Result { - ValidatorPubkeyCacheFile::open(&path) - .and_then(ValidatorPubkeyCacheFile::into_cache) - .map_err(Into::into) - } - - /// Convert a cache using `File` backing to one using `Database` backing. - /// - /// This will write all of the keys from `existing_cache` to `store`. - pub fn convert(existing_cache: Self, store: BeaconStore) -> Result { - let mut result = ValidatorPubkeyCache { - pubkeys: Vec::with_capacity(existing_cache.pubkeys.len()), - indices: HashMap::with_capacity(existing_cache.indices.len()), - pubkey_bytes: Vec::with_capacity(existing_cache.indices.len()), - backing: PubkeyCacheBacking::Database(store), - }; - result.import(existing_cache.pubkeys.iter().map(PublicKeyBytes::from))?; - Ok(result) - } - /// Scan the given `state` and add any new validator public keys. /// /// Does not delete any keys from `self` if they don't appear in `state`. @@ -146,14 +116,8 @@ impl ValidatorPubkeyCache { // The motivation behind this ordering is that we do not want to have states that // reference a pubkey that is not in our cache. However, it's fine to have pubkeys // that are never referenced in a state. - match &mut self.backing { - PubkeyCacheBacking::File(persistence_file) => { - persistence_file.append(i, &pubkey)?; - } - PubkeyCacheBacking::Database(store) => { - store.put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?; - } - } + self.store + .put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?; self.pubkeys.push( (&pubkey) @@ -219,105 +183,6 @@ impl DatabasePubkey { } } -/// Allows for maintaining an on-disk copy of the `ValidatorPubkeyCache`. The file is raw SSZ bytes -/// (not ASCII encoded). -/// -/// ## Writes -/// -/// Each entry is simply appended to the file. -/// -/// ## Reads -/// -/// The whole file is parsed as an SSZ "variable list" of objects. -/// -/// This parsing method is possible because the items in the list are fixed-length SSZ objects. -struct ValidatorPubkeyCacheFile(File); - -#[derive(Debug)] -enum Error { - Io(io::Error), - Ssz(DecodeError), - PubkeyDecode(bls::Error), - /// The file read from disk does not have a contiguous list of validator public keys. The file - /// has become corrupted. - InconsistentIndex { - _expected: Option, - _found: usize, - }, -} - -impl From for BeaconChainError { - fn from(e: Error) -> BeaconChainError { - BeaconChainError::ValidatorPubkeyCacheFileError(format!("{:?}", e)) - } -} - -impl ValidatorPubkeyCacheFile { - /// Opens an existing file for reading and writing. - pub fn open>(path: P) -> Result { - File::options() - .read(true) - .write(true) - .create(false) - .append(true) - .open(path) - .map(Self) - .map_err(Error::Io) - } - - /// Append a public key to file. - /// - /// The provided `index` should each be one greater than the previous and start at 0. - /// Otherwise, the file will become corrupted and unable to be converted into a cache . - pub fn append(&mut self, index: usize, pubkey: &PublicKeyBytes) -> Result<(), Error> { - append_to_file(&mut self.0, index, pubkey) - } - - /// Creates a `ValidatorPubkeyCache` by reading and parsing the underlying file. - pub fn into_cache(mut self) -> Result, Error> { - let mut bytes = vec![]; - self.0.read_to_end(&mut bytes).map_err(Error::Io)?; - - let list: Vec<(usize, PublicKeyBytes)> = Vec::from_ssz_bytes(&bytes).map_err(Error::Ssz)?; - - let mut last = None; - let mut pubkeys = Vec::with_capacity(list.len()); - let mut indices = HashMap::with_capacity(list.len()); - let mut pubkey_bytes = Vec::with_capacity(list.len()); - - for (index, pubkey) in list { - let expected = last.map(|n| n + 1); - if expected.map_or(true, |expected| index == expected) { - last = Some(index); - pubkeys.push((&pubkey).try_into().map_err(Error::PubkeyDecode)?); - pubkey_bytes.push(pubkey); - indices.insert(pubkey, index); - } else { - return Err(Error::InconsistentIndex { - _expected: expected, - _found: index, - }); - } - } - - Ok(ValidatorPubkeyCache { - pubkeys, - indices, - pubkey_bytes, - backing: PubkeyCacheBacking::File(self), - }) - } -} - -fn append_to_file(file: &mut File, index: usize, pubkey: &PublicKeyBytes) -> Result<(), Error> { - let mut line = Vec::with_capacity(index.ssz_bytes_len() + pubkey.ssz_bytes_len()); - - index.ssz_append(&mut line); - pubkey.ssz_append(&mut line); - - file.write_all(&line).map_err(Error::Io) -} - #[cfg(test)] mod test { use super::*; @@ -325,10 +190,7 @@ mod test { use logging::test_logger; use std::sync::Arc; use store::HotColdDB; - use tempfile::tempdir; - use types::{ - test_utils::generate_deterministic_keypair, BeaconState, EthSpec, Keypair, MainnetEthSpec, - }; + use types::{BeaconState, EthSpec, Keypair, MainnetEthSpec}; type E = MainnetEthSpec; type T = EphemeralHarnessType; @@ -422,7 +284,7 @@ mod test { check_cache_get(&cache, &keypairs[..]); drop(cache); - // Re-init the cache from the file. + // Re-init the cache from the store. let mut cache = ValidatorPubkeyCache::load_from_store(store.clone()).expect("should open cache"); check_cache_get(&cache, &keypairs[..]); @@ -435,36 +297,8 @@ mod test { check_cache_get(&cache, &keypairs[..]); drop(cache); - // Re-init the cache from the file. + // Re-init the cache from the store. let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache"); check_cache_get(&cache, &keypairs[..]); } - - #[test] - fn invalid_persisted_file() { - let dir = tempdir().expect("should create tempdir"); - let path = dir.path().join("cache.ssz"); - let pubkey = generate_deterministic_keypair(0).pk.into(); - - let mut file = File::create(&path).expect("should create file"); - append_to_file(&mut file, 0, &pubkey).expect("should write to file"); - drop(file); - - let cache = ValidatorPubkeyCache::::load_from_file(&path).expect("should open cache"); - drop(cache); - - let mut file = File::options() - .write(true) - .append(true) - .open(&path) - .expect("should open file"); - - append_to_file(&mut file, 42, &pubkey).expect("should write bad data to file"); - drop(file); - - assert!( - ValidatorPubkeyCache::::load_from_file(&path).is_err(), - "should not parse invalid file" - ); - } } diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index eef09631e..70eb31cd0 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -8,9 +8,7 @@ mod sync_aggregate_id; pub use attestation::AttMaxCover; pub use max_cover::MaxCover; -pub use persistence::{ - PersistedOperationPool, PersistedOperationPoolAltair, PersistedOperationPoolBase, -}; +pub use persistence::{PersistedOperationPool, PersistedOperationPoolAltair}; use crate::sync_aggregate_id::SyncAggregateId; use attestation_id::AttestationId; diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index acab2db60..076978609 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -17,7 +17,7 @@ type PersistedSyncContributions = Vec<(SyncAggregateId, Vec { } impl PersistedOperationPool { - /// Convert an `OperationPool` into serializable form. Always converts to - /// `PersistedOperationPool::Altair` because the v3 to v4 database schema migration ensures - /// the op pool is always persisted as the Altair variant. + /// Convert an `OperationPool` into serializable form. pub fn from_operation_pool(operation_pool: &OperationPool) -> Self { let attestations = operation_pool .attestations @@ -114,14 +112,6 @@ impl PersistedOperationPool { .collect(), ); let op_pool = match self { - PersistedOperationPool::Base(_) => OperationPool { - attestations, - sync_contributions: <_>::default(), - attester_slashings, - proposer_slashings, - voluntary_exits, - _phantom: Default::default(), - }, PersistedOperationPool::Altair(_) => { let sync_contributions = RwLock::new(self.sync_contributions()?.iter().cloned().collect()); @@ -138,44 +128,9 @@ impl PersistedOperationPool { }; Ok(op_pool) } - - /// Convert the `PersistedOperationPool::Base` variant to `PersistedOperationPool::Altair` by - /// setting `sync_contributions` to its default. - pub fn base_to_altair(self) -> Self { - match self { - PersistedOperationPool::Base(_) => { - PersistedOperationPool::Altair(PersistedOperationPoolAltair { - attestations: self.attestations().to_vec(), - sync_contributions: <_>::default(), - attester_slashings: self.attester_slashings().to_vec(), - proposer_slashings: self.proposer_slashings().to_vec(), - voluntary_exits: self.voluntary_exits().to_vec(), - }) - } - PersistedOperationPool::Altair(_) => self, - } - } } -/// This `StoreItem` implementation is necessary for migrating the `PersistedOperationPool` -/// in the v3 to v4 database schema migration. -impl StoreItem for PersistedOperationPoolBase { - fn db_column() -> DBColumn { - DBColumn::OpPool - } - - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() - } - - fn from_store_bytes(bytes: &[u8]) -> Result { - Self::from_ssz_bytes(bytes).map_err(Into::into) - } -} - -/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair` -/// because the v3 to v4 database schema migration ensures the persisted op pool is always stored -/// in the Altair format. +/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair`. impl StoreItem for PersistedOperationPool { fn db_column() -> DBColumn { DBColumn::OpPool