diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 7b41cb31e..4d2814233 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -3,7 +3,7 @@ use crate::errors::BeaconChainError; use crate::head_tracker::{HeadTracker, SszHeadTracker}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use parking_lot::Mutex; -use slog::{debug, warn, Logger}; +use slog::{debug, error, info, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::mem; use std::sync::mpsc; @@ -29,7 +29,6 @@ pub struct BackgroundMigrator, Cold: ItemStore> thread::JoinHandle<()>, )>, >, - latest_checkpoint: Arc>, /// Genesis block root, for persisting the `PersistedBeaconChain`. genesis_block_root: Hash256, log: Logger, @@ -74,7 +73,6 @@ pub struct MigrationNotification { finalized_state: BeaconState, finalized_checkpoint: Checkpoint, head_tracker: Arc, - latest_checkpoint: Arc>, genesis_block_root: Hash256, } @@ -91,14 +89,9 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator, log: &Logger, ) { - let mut latest_checkpoint = notif.latest_checkpoint.lock(); let finalized_state_root = notif.finalized_state_root; let finalized_state = notif.finalized_state; @@ -173,11 +164,11 @@ impl, Cold: ItemStore> BackgroundMigrator {} Ok(PruningOutcome::DeferredConcurrentMutation) => { warn!( log, @@ -186,18 +177,13 @@ impl, Cold: ItemStore> BackgroundMigrator { - // Update the migrator's idea of the latest checkpoint only if the - // pruning process was successful. - *latest_checkpoint = notif.finalized_checkpoint; - } Err(e) => { warn!(log, "Block pruning failed"; "error" => format!("{:?}", e)); return; } }; - match migrate_database(db, finalized_state_root.into(), &finalized_state) { + match migrate_database(db.clone(), finalized_state_root.into(), &finalized_state) { Ok(()) => {} Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { debug!( @@ -212,8 +198,20 @@ impl, Cold: ItemStore> BackgroundMigrator format!("{:?}", e) ); + return; } }; + + // Finally, compact the database so that new free space is properly reclaimed. + debug!(log, "Starting database compaction"); + if let Err(e) = db.compact() { + error!( + log, + "Database compaction failed"; + "error" => format!("{:?}", e) + ); + } + debug!(log, "Database compaction complete"); } /// Spawn a new child thread to run the migration process. @@ -244,11 +242,18 @@ impl, Cold: ItemStore> BackgroundMigrator, new_finalized_state_hash: BeaconStateHash, new_finalized_state: &BeaconState, - old_finalized_checkpoint: Checkpoint, new_finalized_checkpoint: Checkpoint, genesis_block_root: Hash256, log: &Logger, ) -> Result { + let old_finalized_checkpoint = + store + .load_pruning_checkpoint()? + .unwrap_or_else(|| Checkpoint { + epoch: Epoch::new(0), + root: Hash256::zero(), + }); + let old_finalized_slot = old_finalized_checkpoint .epoch .start_slot(E::slots_per_epoch()); @@ -267,15 +272,12 @@ impl, Cold: ItemStore> BackgroundMigrator old_finalized_checkpoint.epoch, - "old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root), "new_finalized_epoch" => new_finalized_checkpoint.epoch, - "new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root), ); - // For each slot between the new finalized checkpoint and the old finalized checkpoint, // collect the beacon block root and state root of the canonical chain. let newly_finalized_chain: HashMap = @@ -303,7 +305,13 @@ impl, Cold: ItemStore> BackgroundMigrator = HashSet::new(); let heads = head_tracker.heads(); - debug!(log, "Pruning {} heads", heads.len()); + debug!( + log, + "Extra pruning information"; + "old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root), + "new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root), + "head_count" => heads.len(), + ); for (head_hash, head_slot) in heads { let mut potentially_abandoned_head = Some(head_hash); @@ -457,8 +465,11 @@ impl, Cold: ItemStore> BackgroundMigrator Result<(), Error> { - self.delete_temp_states() + self.delete_temp_states()?; + Ok(()) } /// Delete the temporary states that were leftover by failed block imports. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e8b21c870..2323c3aa2 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -9,7 +9,8 @@ use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ - SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY, + PruningCheckpoint, SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY, + SCHEMA_VERSION_KEY, SPLIT_KEY, }; use crate::metrics; use crate::{ @@ -924,6 +925,25 @@ impl, Cold: ItemStore> HotColdDB }) } } + + /// Run a compaction pass to free up space used by deleted states. + pub fn compact(&self) -> Result<(), Error> { + self.hot_db.compact()?; + Ok(()) + } + + /// Load the checkpoint to begin pruning from (the "old finalized checkpoint"). + pub fn load_pruning_checkpoint(&self) -> Result, Error> { + Ok(self + .hot_db + .get(&PRUNING_CHECKPOINT_KEY)? + .map(|pc: PruningCheckpoint| pc.checkpoint)) + } + + /// Create a staged store for the pruning checkpoint. + pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp { + PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY) + } } /// Advance the split point of the store, moving new finalized states to the freezer. diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index fc0dea487..04d0141af 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,6 +1,7 @@ use super::*; use crate::metrics; use db_key::Key; +use leveldb::compaction::Compaction; use leveldb::database::batch::{Batch, Writebatch}; use leveldb::database::kv::KV; use leveldb::database::Database; @@ -152,6 +153,27 @@ impl KeyValueStore for LevelDB { fn begin_rw_transaction(&self) -> MutexGuard<()> { self.transaction_mutex.lock() } + + /// Compact all values in the states and states flag columns. + fn compact(&self) -> Result<(), Error> { + let endpoints = |column: DBColumn| { + ( + BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())), + BytesKey::from_vec(get_key_for_col( + column.as_str(), + Hash256::repeat_byte(0xff).as_bytes(), + )), + ) + }; + + for (start_key, end_key) in vec![ + endpoints(DBColumn::BeaconStateTemporary), + endpoints(DBColumn::BeaconState), + ] { + self.db.compact(&start_key, &end_key); + } + Ok(()) + } } impl ItemStore for LevelDB {} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 38411d9a1..264f0e94c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -67,6 +67,9 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// This doesn't prevent other threads writing to the DB unless they also use /// this method. In future we may implement a safer mandatory locking scheme. fn begin_rw_transaction(&self) -> MutexGuard<()>; + + /// Compact the database, freeing space used by deleted items. + fn compact(&self) -> Result<(), Error>; } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 2df503965..3ff39c67f 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -84,6 +84,10 @@ impl KeyValueStore for MemoryStore { fn begin_rw_transaction(&self) -> MutexGuard<()> { self.transaction_mutex.lock() } + + fn compact(&self) -> Result<(), Error> { + Ok(()) + } } impl ItemStore for MemoryStore {} diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index c03046986..797a2b633 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -1,6 +1,6 @@ use crate::{DBColumn, Error, StoreItem}; use ssz::{Decode, Encode}; -use types::Hash256; +use types::{Checkpoint, Hash256}; pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2); @@ -10,6 +10,7 @@ pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2); pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0); pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1); pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2); +pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct SchemaVersion(pub u64); @@ -33,3 +34,27 @@ impl StoreItem for SchemaVersion { Ok(SchemaVersion(u64::from_ssz_bytes(bytes)?)) } } + +/// The checkpoint used for pruning the database. +/// +/// Updated whenever pruning is successful. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PruningCheckpoint { + pub checkpoint: Checkpoint, +} + +impl StoreItem for PruningCheckpoint { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.checkpoint.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(PruningCheckpoint { + checkpoint: Checkpoint::from_ssz_bytes(bytes)?, + }) + } +}