From 556190ff46960b30a99adf33927ea551de04506f Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 9 Nov 2020 07:02:21 +0000 Subject: [PATCH] Compact database on finalization (#1871) ## Issue Addressed Closes #1866 ## Proposed Changes * Compact the database on finalization. This removes the deleted states from disk completely. Because it happens in the background migrator, it doesn't block other database operations while it runs. On my Medalla node it took about 1 minute and shrank the database from 90GB to 9GB. * Fix an inefficiency in the pruning algorithm where it would always use the genesis checkpoint as the `old_finalized_checkpoint` when running for the first time after start-up. This would result in loading lots of states one-at-a-time back to genesis, and storing a lot of block roots in memory. The new code stores the old finalized checkpoint on disk and only uses genesis if no checkpoint is already stored. This makes it both backwards compatible _and_ forwards compatible -- no schema change required! * Introduce two new `INFO` logs to indicate when pruning has started and completed. Users seem to want to know this information without enabling debug logs! --- beacon_node/beacon_chain/src/migrate.rs | 59 ++++++++++++--------- beacon_node/store/src/garbage_collection.rs | 3 +- beacon_node/store/src/hot_cold_store.rs | 22 +++++++- beacon_node/store/src/leveldb_store.rs | 22 ++++++++ beacon_node/store/src/lib.rs | 3 ++ beacon_node/store/src/memory_store.rs | 4 ++ beacon_node/store/src/metadata.rs | 27 +++++++++- 7 files changed, 113 insertions(+), 27 deletions(-) diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 7b41cb31e..4d2814233 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -3,7 +3,7 @@ use crate::errors::BeaconChainError; use crate::head_tracker::{HeadTracker, SszHeadTracker}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use parking_lot::Mutex; -use slog::{debug, warn, Logger}; +use slog::{debug, error, info, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::mem; use std::sync::mpsc; @@ -29,7 +29,6 @@ pub struct BackgroundMigrator, Cold: ItemStore> thread::JoinHandle<()>, )>, >, - latest_checkpoint: Arc>, /// Genesis block root, for persisting the `PersistedBeaconChain`. genesis_block_root: Hash256, log: Logger, @@ -74,7 +73,6 @@ pub struct MigrationNotification { finalized_state: BeaconState, finalized_checkpoint: Checkpoint, head_tracker: Arc, - latest_checkpoint: Arc>, genesis_block_root: Hash256, } @@ -91,14 +89,9 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator, log: &Logger, ) { - let mut latest_checkpoint = notif.latest_checkpoint.lock(); let finalized_state_root = notif.finalized_state_root; let finalized_state = notif.finalized_state; @@ -173,11 +164,11 @@ impl, Cold: ItemStore> BackgroundMigrator {} Ok(PruningOutcome::DeferredConcurrentMutation) => { warn!( log, @@ -186,18 +177,13 @@ impl, Cold: ItemStore> BackgroundMigrator { - // Update the migrator's idea of the latest checkpoint only if the - // pruning process was successful. - *latest_checkpoint = notif.finalized_checkpoint; - } Err(e) => { warn!(log, "Block pruning failed"; "error" => format!("{:?}", e)); return; } }; - match migrate_database(db, finalized_state_root.into(), &finalized_state) { + match migrate_database(db.clone(), finalized_state_root.into(), &finalized_state) { Ok(()) => {} Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { debug!( @@ -212,8 +198,20 @@ impl, Cold: ItemStore> BackgroundMigrator format!("{:?}", e) ); + return; } }; + + // Finally, compact the database so that new free space is properly reclaimed. + debug!(log, "Starting database compaction"); + if let Err(e) = db.compact() { + error!( + log, + "Database compaction failed"; + "error" => format!("{:?}", e) + ); + } + debug!(log, "Database compaction complete"); } /// Spawn a new child thread to run the migration process. @@ -244,11 +242,18 @@ impl, Cold: ItemStore> BackgroundMigrator, new_finalized_state_hash: BeaconStateHash, new_finalized_state: &BeaconState, - old_finalized_checkpoint: Checkpoint, new_finalized_checkpoint: Checkpoint, genesis_block_root: Hash256, log: &Logger, ) -> Result { + let old_finalized_checkpoint = + store + .load_pruning_checkpoint()? + .unwrap_or_else(|| Checkpoint { + epoch: Epoch::new(0), + root: Hash256::zero(), + }); + let old_finalized_slot = old_finalized_checkpoint .epoch .start_slot(E::slots_per_epoch()); @@ -267,15 +272,12 @@ impl, Cold: ItemStore> BackgroundMigrator old_finalized_checkpoint.epoch, - "old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root), "new_finalized_epoch" => new_finalized_checkpoint.epoch, - "new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root), ); - // For each slot between the new finalized checkpoint and the old finalized checkpoint, // collect the beacon block root and state root of the canonical chain. let newly_finalized_chain: HashMap = @@ -303,7 +305,13 @@ impl, Cold: ItemStore> BackgroundMigrator = HashSet::new(); let heads = head_tracker.heads(); - debug!(log, "Pruning {} heads", heads.len()); + debug!( + log, + "Extra pruning information"; + "old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root), + "new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root), + "head_count" => heads.len(), + ); for (head_hash, head_slot) in heads { let mut potentially_abandoned_head = Some(head_hash); @@ -457,8 +465,11 @@ impl, Cold: ItemStore> BackgroundMigrator Result<(), Error> { - self.delete_temp_states() + self.delete_temp_states()?; + Ok(()) } /// Delete the temporary states that were leftover by failed block imports. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e8b21c870..2323c3aa2 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -9,7 +9,8 @@ use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ - SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY, + PruningCheckpoint, SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY, + SCHEMA_VERSION_KEY, SPLIT_KEY, }; use crate::metrics; use crate::{ @@ -924,6 +925,25 @@ impl, Cold: ItemStore> HotColdDB }) } } + + /// Run a compaction pass to free up space used by deleted states. + pub fn compact(&self) -> Result<(), Error> { + self.hot_db.compact()?; + Ok(()) + } + + /// Load the checkpoint to begin pruning from (the "old finalized checkpoint"). + pub fn load_pruning_checkpoint(&self) -> Result, Error> { + Ok(self + .hot_db + .get(&PRUNING_CHECKPOINT_KEY)? + .map(|pc: PruningCheckpoint| pc.checkpoint)) + } + + /// Create a staged store for the pruning checkpoint. + pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp { + PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY) + } } /// Advance the split point of the store, moving new finalized states to the freezer. diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index fc0dea487..04d0141af 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,6 +1,7 @@ use super::*; use crate::metrics; use db_key::Key; +use leveldb::compaction::Compaction; use leveldb::database::batch::{Batch, Writebatch}; use leveldb::database::kv::KV; use leveldb::database::Database; @@ -152,6 +153,27 @@ impl KeyValueStore for LevelDB { fn begin_rw_transaction(&self) -> MutexGuard<()> { self.transaction_mutex.lock() } + + /// Compact all values in the states and states flag columns. + fn compact(&self) -> Result<(), Error> { + let endpoints = |column: DBColumn| { + ( + BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())), + BytesKey::from_vec(get_key_for_col( + column.as_str(), + Hash256::repeat_byte(0xff).as_bytes(), + )), + ) + }; + + for (start_key, end_key) in vec![ + endpoints(DBColumn::BeaconStateTemporary), + endpoints(DBColumn::BeaconState), + ] { + self.db.compact(&start_key, &end_key); + } + Ok(()) + } } impl ItemStore for LevelDB {} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 38411d9a1..264f0e94c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -67,6 +67,9 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// This doesn't prevent other threads writing to the DB unless they also use /// this method. In future we may implement a safer mandatory locking scheme. fn begin_rw_transaction(&self) -> MutexGuard<()>; + + /// Compact the database, freeing space used by deleted items. + fn compact(&self) -> Result<(), Error>; } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 2df503965..3ff39c67f 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -84,6 +84,10 @@ impl KeyValueStore for MemoryStore { fn begin_rw_transaction(&self) -> MutexGuard<()> { self.transaction_mutex.lock() } + + fn compact(&self) -> Result<(), Error> { + Ok(()) + } } impl ItemStore for MemoryStore {} diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index c03046986..797a2b633 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -1,6 +1,6 @@ use crate::{DBColumn, Error, StoreItem}; use ssz::{Decode, Encode}; -use types::Hash256; +use types::{Checkpoint, Hash256}; pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2); @@ -10,6 +10,7 @@ pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2); pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0); pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1); pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2); +pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct SchemaVersion(pub u64); @@ -33,3 +34,27 @@ impl StoreItem for SchemaVersion { Ok(SchemaVersion(u64::from_ssz_bytes(bytes)?)) } } + +/// The checkpoint used for pruning the database. +/// +/// Updated whenever pruning is successful. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PruningCheckpoint { + pub checkpoint: Checkpoint, +} + +impl StoreItem for PruningCheckpoint { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.checkpoint.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(PruningCheckpoint { + checkpoint: Checkpoint::from_ssz_bytes(bytes)?, + }) + } +}