diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 143f07e0d..1dac8b296 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -44,7 +44,7 @@ use std::io::prelude::*; use std::sync::Arc; use std::time::{Duration, Instant}; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; -use store::{Error as DBError, HotColdDB}; +use store::{Error as DBError, HotColdDB, StoreOp}; use types::*; pub type ForkChoiceError = fork_choice::Error; @@ -1422,8 +1422,8 @@ impl BeaconChain { let block_root = fully_verified_block.block_root; let state = fully_verified_block.state; let parent_block = fully_verified_block.parent_block; - let intermediate_states = fully_verified_block.intermediate_states; let current_slot = self.slot()?; + let mut ops = fully_verified_block.intermediate_states; let attestation_observation_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION); @@ -1515,18 +1515,13 @@ impl BeaconChain { let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); - // Store all the states between the parent block state and this block's slot before storing - // the final state. - intermediate_states.commit(&*self.store)?; - - // Store the block and state. - // NOTE: we store the block *after* the state to guard against inconsistency in the event of - // a crash, as states are usually looked up from blocks, not the other way around. A better - // solution would be to use a database transaction (once our choice of database and API - // settles down). - // See: https://github.com/sigp/lighthouse/issues/692 - self.store.put_state(&block.state_root, &state)?; - self.store.put_block(&block_root, signed_block.clone())?; + // Store all the states between the parent block state and this block's slot, the block and state. + ops.push(StoreOp::PutBlock(block_root.into(), signed_block.clone())); + ops.push(StoreOp::PutState( + block.state_root.into(), + Cow::Borrowed(&state), + )); + self.store.do_atomically(ops)?; // The fork choice write-lock is dropped *after* the on-disk database has been updated. // This prevents inconsistency between the two at the expense of concurrency. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0467009aa..fa4e0d6a9 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -62,7 +62,7 @@ use std::borrow::Cow; use std::convert::TryFrom; use std::fs; use std::io::Write; -use store::{Error as DBError, StateBatch}; +use store::{Error as DBError, HotStateSummary, StoreOp}; use tree_hash::TreeHash; use types::{ BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256, @@ -263,12 +263,12 @@ pub struct SignatureVerifiedBlock { /// Note: a `FullyVerifiedBlock` is not _forever_ valid to be imported, it may later become invalid /// due to finality or some other event. A `FullyVerifiedBlock` should be imported into the /// `BeaconChain` immediately after it is instantiated. -pub struct FullyVerifiedBlock { +pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> { pub block: SignedBeaconBlock, pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock, - pub intermediate_states: StateBatch, + pub intermediate_states: Vec>, } /// Implemented on types that can be converted into a `FullyVerifiedBlock`. @@ -506,7 +506,7 @@ impl IntoFullyVerifiedBlock for SignedBeaconBlock FullyVerifiedBlock { +impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { /// Instantiates `Self`, a wrapper that indicates that the given `block` is fully valid. See /// the struct-level documentation for more information. /// @@ -552,7 +552,7 @@ impl FullyVerifiedBlock { // Keep a batch of any states that were "skipped" (block-less) in between the parent state // slot and the block slot. These will be stored in the database. - let mut intermediate_states = StateBatch::new(); + let mut intermediate_states: Vec> = Vec::new(); // The block must have a higher slot than its parent. if block.slot() <= parent.beacon_state.slot { @@ -575,7 +575,16 @@ impl FullyVerifiedBlock { // Computing the state root here is time-equivalent to computing it during slot // processing, but we get early access to it. let state_root = state.update_tree_hash_cache()?; - intermediate_states.add_state(state_root, &state)?; + + let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 { + StoreOp::PutState(state_root.into(), Cow::Owned(state.clone())) + } else { + StoreOp::PutStateSummary( + state_root.into(), + HotStateSummary::new(&state_root, &state)?, + ) + }; + intermediate_states.push(op); state_root }; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 7291a3ffc..1ad83b633 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -152,7 +152,7 @@ pub trait Migrate, Cold: ItemStore>: } } - let batch: Vec = abandoned_blocks + let batch: Vec> = abandoned_blocks .into_iter() .map(|block_hash| StoreOp::DeleteBlock(block_hash)) .chain( @@ -161,7 +161,7 @@ pub trait Migrate, Cold: ItemStore>: .map(|(slot, state_hash)| StoreOp::DeleteState(state_hash, slot)), ) .collect(); - store.do_atomically(&batch)?; + store.do_atomically(batch)?; for head_hash in abandoned_heads.into_iter() { head_tracker.remove_head(head_hash); } diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs index eec16ba55..8f5b09deb 100644 --- a/beacon_node/store/src/chunked_vector.rs +++ b/beacon_node/store/src/chunked_vector.rs @@ -195,6 +195,7 @@ pub trait Field: Copy { fn check_and_store_genesis_value>( store: &S, value: Self::Value, + ops: &mut Vec, ) -> Result<(), Error> { let key = &genesis_value_key()[..]; @@ -217,7 +218,9 @@ pub trait Field: Copy { Ok(()) } } else { - Chunk::new(vec![value]).store(store, Self::column(), &genesis_value_key()[..]) + let chunk = Chunk::new(vec![value]); + chunk.store(Self::column(), &genesis_value_key()[..], ops)?; + Ok(()) } } @@ -332,6 +335,7 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store: &S, state: &BeaconState, spec: &ChainSpec, + ops: &mut Vec, ) -> Result<(), Error> { let chunk_size = F::chunk_size(); let (start_vindex, end_vindex) = F::start_and_end_vindex(state.slot, spec); @@ -341,7 +345,7 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( // Store the genesis value if we have access to it, and it hasn't been stored already. if F::slot_needs_genesis_value(state.slot, spec) { let genesis_value = F::extract_genesis_value(state, spec)?; - F::check_and_store_genesis_value(store, genesis_value)?; + F::check_and_store_genesis_value(store, genesis_value, ops)?; } // Start by iterating backwards from the last chunk, storing new chunks in the database. @@ -355,6 +359,7 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store, state, spec, + ops, )?; // If the previous `store_range` did not check the entire range, it may be the case that the @@ -369,6 +374,7 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store, state, spec, + ops, )?; } @@ -383,6 +389,7 @@ fn store_range( store: &S, state: &BeaconState, spec: &ChainSpec, + ops: &mut Vec, ) -> Result where F: Field, @@ -409,7 +416,7 @@ where return Ok(false); } - new_chunk.store(store, F::column(), chunk_key)?; + new_chunk.store(F::column(), chunk_key, ops)?; } Ok(true) @@ -585,13 +592,14 @@ where .transpose() } - pub fn store, E: EthSpec>( + pub fn store( &self, - store: &S, column: DBColumn, key: &[u8], + ops: &mut Vec, ) -> Result<(), Error> { - store.put_bytes(column.into(), key, &self.encode()?)?; + let db_key = get_key_for_col(column.into(), key); + ops.push(KeyValueStoreOp::PutKeyValue(db_key, self.encode()?)); Ok(()) } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 73ddad436..0d96be25e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -87,6 +87,62 @@ pub enum HotColdDBError { RestorePointBlockHashError(BeaconStateError), } +impl HotColdDB, MemoryStore> { + pub fn open_ephemeral( + config: StoreConfig, + spec: ChainSpec, + log: Logger, + ) -> Result, MemoryStore>, Error> { + Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + + let db = HotColdDB { + split: RwLock::new(Split::default()), + cold_db: MemoryStore::open(), + hot_db: MemoryStore::open(), + block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + config, + spec, + log, + _phantom: PhantomData, + }; + + Ok(db) + } +} + +impl HotColdDB, LevelDB> { + /// Open a new or existing database, with the given paths to the hot and cold DBs. + /// + /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. + pub fn open( + hot_path: &Path, + cold_path: &Path, + config: StoreConfig, + spec: ChainSpec, + log: Logger, + ) -> Result, LevelDB>, Error> { + Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + + let db = HotColdDB { + split: RwLock::new(Split::default()), + cold_db: LevelDB::open(cold_path)?, + hot_db: LevelDB::open(hot_path)?, + block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + config, + spec, + log, + _phantom: PhantomData, + }; + + // Load the previous split slot from the database (if any). This ensures we can + // stop and restart correctly. + if let Some(split) = db.load_split()? { + *db.split.write() = split; + } + Ok(db) + } +} + impl, Cold: ItemStore> HotColdDB { /// Store a block and update the LRU cache. pub fn put_block( @@ -141,9 +197,13 @@ impl, Cold: ItemStore> HotColdDB /// Store a state in the store. pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { - self.store_cold_state(state_root, &state) + let mut ops: Vec = Vec::new(); + self.store_cold_state(state_root, &state, &mut ops)?; + self.cold_db.do_atomically(ops) } else { - self.store_hot_state(state_root, state) + let mut ops: Vec = Vec::new(); + self.store_hot_state(state_root, state, &mut ops)?; + self.hot_db.do_atomically(ops) } } @@ -243,12 +303,27 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.exists::(key) } - pub fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { + pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { let mut guard = self.block_cache.lock(); let mut key_value_batch: Vec = Vec::with_capacity(batch.len()); - for op in batch { + for op in &batch { match op { + StoreOp::PutBlock(block_hash, block) => { + let untyped_hash: Hash256 = (*block_hash).into(); + key_value_batch.push(block.as_kv_store_op(untyped_hash)); + } + + StoreOp::PutState(state_hash, state) => { + let untyped_hash: Hash256 = (*state_hash).into(); + self.store_hot_state(&untyped_hash, state, &mut key_value_batch)?; + } + + StoreOp::PutStateSummary(state_hash, summary) => { + let untyped_hash: Hash256 = (*state_hash).into(); + key_value_batch.push(summary.as_kv_store_op(untyped_hash)); + } + StoreOp::DeleteBlock(block_hash) => { let untyped_hash: Hash256 = (*block_hash).into(); let key = @@ -272,78 +347,29 @@ impl, Cold: ItemStore> HotColdDB } } } - self.hot_db.do_atomically(&key_value_batch)?; + self.hot_db.do_atomically(key_value_batch)?; - for op in batch { + for op in &batch { match op { + StoreOp::PutBlock(block_hash, block) => { + let untyped_hash: Hash256 = (*block_hash).into(); + guard.put(untyped_hash, block.clone()); + } + + StoreOp::PutState(_, _) => (), + + StoreOp::PutStateSummary(_, _) => (), + StoreOp::DeleteBlock(block_hash) => { let untyped_hash: Hash256 = (*block_hash).into(); guard.pop(&untyped_hash); } + StoreOp::DeleteState(_, _) => (), } } Ok(()) } -} - -impl HotColdDB, MemoryStore> { - pub fn open_ephemeral( - config: StoreConfig, - spec: ChainSpec, - log: Logger, - ) -> Result, MemoryStore>, Error> { - Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; - - let db = HotColdDB { - split: RwLock::new(Split::default()), - cold_db: MemoryStore::open(), - hot_db: MemoryStore::open(), - block_cache: Mutex::new(LruCache::new(config.block_cache_size)), - config, - spec, - log, - _phantom: PhantomData, - }; - - Ok(db) - } -} - -impl HotColdDB, LevelDB> { - /// Open a new or existing database, with the given paths to the hot and cold DBs. - /// - /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. - pub fn open( - hot_path: &Path, - cold_path: &Path, - config: StoreConfig, - spec: ChainSpec, - log: Logger, - ) -> Result, LevelDB>, Error> { - Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; - - let db = HotColdDB { - split: RwLock::new(Split::default()), - cold_db: LevelDB::open(cold_path)?, - hot_db: LevelDB::open(hot_path)?, - block_cache: Mutex::new(LruCache::new(config.block_cache_size)), - config, - spec, - log, - _phantom: PhantomData, - }; - - // Load the previous split slot from the database (if any). This ensures we can - // stop and restart correctly. - if let Some(split) = db.load_split()? { - *db.split.write() = split; - } - Ok(db) - } -} - -impl, Cold: ItemStore> HotColdDB { /// Store a post-finalization state efficiently in the hot database. /// /// On an epoch boundary, store a full state. On an intermediate slot, store @@ -352,6 +378,7 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, + ops: &mut Vec, ) -> Result<(), Error> { // On the epoch boundary, store the full state. if state.slot % E::slots_per_epoch() == 0 { @@ -361,13 +388,15 @@ impl, Cold: ItemStore> HotColdDB "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); - store_full_state(&self.hot_db, state_root, &state)?; + store_full_state(state_root, &state, ops)?; } // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. - self.put_state_summary(state_root, HotStateSummary::new(state_root, state)?)?; + let hot_state_summary = HotStateSummary::new(state_root, state)?; + let op = hot_state_summary.as_kv_store_op(*state_root); + ops.push(op); Ok(()) } @@ -413,6 +442,7 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, + ops: &mut Vec, ) -> Result<(), Error> { if state.slot % self.config.slots_per_restore_point != 0 { warn!( @@ -433,18 +463,19 @@ impl, Cold: ItemStore> HotColdDB // 1. Convert to PartialBeaconState and store that in the DB. let partial_state = PartialBeaconState::from_state_forgetful(state); - self.cold_db.put(state_root, &partial_state)?; + let op = partial_state.as_kv_store_op(*state_root); + ops.push(op); // 2. Store updated vector entries. let db = &self.cold_db; - store_updated_vector(BlockRoots, db, state, &self.spec)?; - store_updated_vector(StateRoots, db, state, &self.spec)?; - store_updated_vector(HistoricalRoots, db, state, &self.spec)?; - store_updated_vector(RandaoMixes, db, state, &self.spec)?; + store_updated_vector(BlockRoots, db, state, &self.spec, ops)?; + store_updated_vector(StateRoots, db, state, &self.spec, ops)?; + store_updated_vector(HistoricalRoots, db, state, &self.spec, ops)?; + store_updated_vector(RandaoMixes, db, state, &self.spec, ops)?; // 3. Store restore point. let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point; - self.store_restore_point_hash(restore_point_index, *state_root)?; + self.store_restore_point_hash(restore_point_index, *state_root, ops); Ok(()) } @@ -666,11 +697,11 @@ impl, Cold: ItemStore> HotColdDB &self, restore_point_index: u64, state_root: Hash256, - ) -> Result<(), Error> { - let key = Self::restore_point_key(restore_point_index); - self.cold_db - .put(&key, &RestorePointHash { state_root }) - .map_err(Into::into) + ops: &mut Vec, + ) { + let value = &RestorePointHash { state_root }; + let op = value.as_kv_store_op(Self::restore_point_key(restore_point_index)); + ops.push(op); } /// Convert a `restore_point_index` into a database key. @@ -775,7 +806,9 @@ pub fn process_finalization, Cold: ItemStore>( let state: BeaconState = get_full_state(&store.hot_db, &state_root)? .ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?; - store.store_cold_state(&state_root, &state)?; + let mut ops: Vec = Vec::new(); + store.store_cold_state(&state_root, &state, &mut ops)?; + store.cold_db.do_atomically(ops)?; } // Store a pointer from this state root to its slot, so we can later reconstruct states diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 8b57f80e1..6cff12707 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -4,24 +4,20 @@ use ssz_derive::{Decode, Encode}; use std::convert::TryInto; use types::beacon_state::{CloneConfig, CommitteeCache, CACHED_EPOCHS}; -pub fn store_full_state, E: EthSpec>( - store: &KV, +pub fn store_full_state( state_root: &Hash256, state: &BeaconState, + ops: &mut Vec, ) -> Result<(), Error> { - let total_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_TIMES); - let overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES); - - let bytes = StorageContainer::new(state).as_ssz_bytes(); - metrics::stop_timer(overhead_timer); - - let result = store.put_bytes(DBColumn::BeaconState.into(), state_root.as_bytes(), &bytes); - - metrics::stop_timer(total_timer); - metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); + let bytes = { + let _overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES); + StorageContainer::new(state).as_ssz_bytes() + }; metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as i64); - - result + metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); + let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); + ops.push(KeyValueStoreOp::PutKeyValue(key, bytes)); + Ok(()) } pub fn get_full_state, E: EthSpec>( diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 6e3dc8bbe..4a7822b85 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -98,12 +98,16 @@ impl KeyValueStore for LevelDB { .map_err(Into::into) } - fn do_atomically(&self, ops_batch: &[KeyValueStoreOp]) -> Result<(), Error> { + fn do_atomically(&self, ops_batch: Vec) -> Result<(), Error> { let mut leveldb_batch = Writebatch::new(); - for op in ops_batch.into_iter() { + for op in ops_batch { match op { + KeyValueStoreOp::PutKeyValue(key, value) => { + leveldb_batch.put(BytesKey::from_vec(key), &value); + } + KeyValueStoreOp::DeleteKey(key) => { - leveldb_batch.delete(BytesKey::from_vec(key.to_vec())); + leveldb_batch.delete(BytesKey::from_vec(key)); } } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 1d0506ce3..f9f04b884 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -21,10 +21,11 @@ mod leveldb_store; mod memory_store; mod metrics; mod partial_beacon_state; -mod state_batch; pub mod iter; +use std::borrow::Cow; + pub use self::config::StoreConfig; pub use self::hot_cold_store::{HotColdDB, HotStateSummary}; pub use self::leveldb_store::LevelDB; @@ -33,7 +34,6 @@ pub use self::partial_beacon_state::PartialBeaconState; pub use errors::Error; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metrics::scrape_for_metrics; -pub use state_batch::StateBatch; pub use types::*; pub trait KeyValueStore: Sync + Send + Sized + 'static { @@ -50,7 +50,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>; /// Execute either all of the operations in `batch` or none at all, returning an error. - fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error>; + fn do_atomically(&self, batch: Vec) -> Result<(), Error>; } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { @@ -60,6 +60,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { } pub enum KeyValueStoreOp { + PutKeyValue(Vec, Vec), DeleteKey(Vec), } @@ -103,7 +104,11 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 -pub enum StoreOp { +#[allow(clippy::large_enum_variant)] +pub enum StoreOp<'a, E: EthSpec> { + PutBlock(SignedBeaconBlockHash, SignedBeaconBlock), + PutState(BeaconStateHash, Cow<'a, BeaconState>), + PutStateSummary(BeaconStateHash, HotStateSummary), DeleteBlock(SignedBeaconBlockHash), DeleteState(BeaconStateHash, Slot), } @@ -165,6 +170,11 @@ pub trait StoreItem: Sized { /// /// Return an instance of the type and the number of bytes that were read. fn from_store_bytes(bytes: &[u8]) -> Result; + + fn as_kv_store_op(&self, key: Hash256) -> KeyValueStoreOp { + let db_key = get_key_for_col(Self::db_column().into(), key.as_bytes()); + KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes()) + } } #[cfg(test)] diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index c8556860c..30a0b1e0b 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -64,11 +64,15 @@ impl KeyValueStore for MemoryStore { Ok(()) } - fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error> { + fn do_atomically(&self, batch: Vec) -> Result<(), Error> { for op in batch { match op { + KeyValueStoreOp::PutKeyValue(key, value) => { + self.db.write().insert(key, value); + } + KeyValueStoreOp::DeleteKey(hash) => { - self.db.write().remove(hash); + self.db.write().remove(&hash); } } } diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 2f821f033..826712a72 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -78,10 +78,6 @@ lazy_static! { "store_beacon_state_read_bytes_total", "Total number of beacon state bytes read from the DB" ); - pub static ref BEACON_STATE_WRITE_TIMES: Result = try_create_histogram( - "store_beacon_state_write_seconds", - "Total time required to write a BeaconState to the database" - ); pub static ref BEACON_STATE_WRITE_OVERHEAD_TIMES: Result = try_create_histogram( "store_beacon_state_write_overhead_seconds", "Overhead on writing a beacon state to the DB (e.g., encoding)" diff --git a/beacon_node/store/src/state_batch.rs b/beacon_node/store/src/state_batch.rs deleted file mode 100644 index ab2d060c1..000000000 --- a/beacon_node/store/src/state_batch.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::{Error, HotColdDB, HotStateSummary, ItemStore}; -use types::{BeaconState, EthSpec, Hash256}; - -/// A collection of states to be stored in the database. -/// -/// Consumes minimal space in memory by not storing states between epoch boundaries. -#[derive(Debug, Clone, Default)] -pub struct StateBatch { - items: Vec>, -} - -#[derive(Debug, Clone)] -#[allow(clippy::large_enum_variant)] -enum BatchItem { - Full(Hash256, BeaconState), - Summary(Hash256, HotStateSummary), -} - -impl StateBatch { - /// Create a new empty batch. - pub fn new() -> Self { - Self::default() - } - - /// Stage a `BeaconState` to be stored. - pub fn add_state(&mut self, state_root: Hash256, state: &BeaconState) -> Result<(), Error> { - let item = if state.slot % E::slots_per_epoch() == 0 { - BatchItem::Full(state_root, state.clone()) - } else { - BatchItem::Summary(state_root, HotStateSummary::new(&state_root, state)?) - }; - self.items.push(item); - Ok(()) - } - - /// Write the batch to the database. - /// - /// May fail to write the full batch if any of the items error (i.e. not atomic!) - pub fn commit, Cold: ItemStore>( - self, - store: &HotColdDB, - ) -> Result<(), Error> { - self.items.into_iter().try_for_each(|item| match item { - BatchItem::Full(state_root, state) => store.put_state(&state_root, &state), - BatchItem::Summary(state_root, summary) => { - store.put_state_summary(&state_root, summary) - } - }) - } -}