Compact database on finalization (#1871)

## Issue Addressed

Closes #1866

## Proposed Changes

* Compact the database on finalization. This removes the deleted states from disk completely. Because it happens in the background migrator, it doesn't block other database operations while it runs. On my Medalla node it took about 1 minute and shrank the database from 90GB to 9GB.
* Fix an inefficiency in the pruning algorithm where it would always use the genesis checkpoint as the `old_finalized_checkpoint` when running for the first time after start-up. This would result in loading lots of states one-at-a-time back to genesis, and storing a lot of block roots in memory. The new code stores the old finalized checkpoint on disk and only uses genesis if no checkpoint is already stored. This makes it both backwards compatible _and_ forwards compatible -- no schema change required!
* Introduce two new `INFO` logs to indicate when pruning has started and completed. Users seem to want to know this information without enabling debug logs!
This commit is contained in:
Michael Sproul 2020-11-09 07:02:21 +00:00
parent b711cfe2bb
commit 556190ff46
7 changed files with 113 additions and 27 deletions

View File

@ -3,7 +3,7 @@ use crate::errors::BeaconChainError;
use crate::head_tracker::{HeadTracker, SszHeadTracker};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use parking_lot::Mutex;
use slog::{debug, warn, Logger};
use slog::{debug, error, info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::mpsc;
@ -29,7 +29,6 @@ pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
thread::JoinHandle<()>,
)>,
>,
latest_checkpoint: Arc<Mutex<Checkpoint>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`.
genesis_block_root: Hash256,
log: Logger,
@ -74,7 +73,6 @@ pub struct MigrationNotification<E: EthSpec> {
finalized_state: BeaconState<E>,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
latest_checkpoint: Arc<Mutex<Checkpoint>>,
genesis_block_root: Hash256,
}
@ -91,14 +89,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
} else {
Some(Mutex::new(Self::spawn_thread(db.clone(), log.clone())))
};
let latest_checkpoint = Arc::new(Mutex::new(Checkpoint {
root: Hash256::zero(),
epoch: Epoch::new(0),
}));
Self {
db,
tx_thread,
latest_checkpoint,
genesis_block_root,
log,
}
@ -121,7 +114,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
finalized_state,
finalized_checkpoint,
head_tracker,
latest_checkpoint: self.latest_checkpoint.clone(),
genesis_block_root: self.genesis_block_root,
};
@ -164,7 +156,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
notif: MigrationNotification<E>,
log: &Logger,
) {
let mut latest_checkpoint = notif.latest_checkpoint.lock();
let finalized_state_root = notif.finalized_state_root;
let finalized_state = notif.finalized_state;
@ -173,11 +164,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
notif.head_tracker,
finalized_state_root,
&finalized_state,
*latest_checkpoint,
notif.finalized_checkpoint,
notif.genesis_block_root,
log,
) {
Ok(PruningOutcome::Successful) => {}
Ok(PruningOutcome::DeferredConcurrentMutation) => {
warn!(
log,
@ -186,18 +177,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
);
return;
}
Ok(PruningOutcome::Successful) => {
// Update the migrator's idea of the latest checkpoint only if the
// pruning process was successful.
*latest_checkpoint = notif.finalized_checkpoint;
}
Err(e) => {
warn!(log, "Block pruning failed"; "error" => format!("{:?}", e));
return;
}
};
match migrate_database(db, finalized_state_root.into(), &finalized_state) {
match migrate_database(db.clone(), finalized_state_root.into(), &finalized_state) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
@ -212,8 +198,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
"Database migration failed";
"error" => format!("{:?}", e)
);
return;
}
};
// Finally, compact the database so that new free space is properly reclaimed.
debug!(log, "Starting database compaction");
if let Err(e) = db.compact() {
error!(
log,
"Database compaction failed";
"error" => format!("{:?}", e)
);
}
debug!(log, "Database compaction complete");
}
/// Spawn a new child thread to run the migration process.
@ -244,11 +242,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
head_tracker: Arc<HeadTracker>,
new_finalized_state_hash: BeaconStateHash,
new_finalized_state: &BeaconState<E>,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
genesis_block_root: Hash256,
log: &Logger,
) -> Result<PruningOutcome, BeaconChainError> {
let old_finalized_checkpoint =
store
.load_pruning_checkpoint()?
.unwrap_or_else(|| Checkpoint {
epoch: Epoch::new(0),
root: Hash256::zero(),
});
let old_finalized_slot = old_finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
@ -267,15 +272,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.into());
}
debug!(
info!(
log,
"Starting database pruning";
"old_finalized_epoch" => old_finalized_checkpoint.epoch,
"old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root),
"new_finalized_epoch" => new_finalized_checkpoint.epoch,
"new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root),
);
// For each slot between the new finalized checkpoint and the old finalized checkpoint,
// collect the beacon block root and state root of the canonical chain.
let newly_finalized_chain: HashMap<Slot, (SignedBeaconBlockHash, BeaconStateHash)> =
@ -303,7 +305,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let mut abandoned_heads: HashSet<Hash256> = HashSet::new();
let heads = head_tracker.heads();
debug!(log, "Pruning {} heads", heads.len());
debug!(
log,
"Extra pruning information";
"old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root),
"new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root),
"head_count" => heads.len(),
);
for (head_hash, head_slot) in heads {
let mut potentially_abandoned_head = Some(head_hash);
@ -457,8 +465,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
drop(head_tracker_lock);
kv_batch.push(persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY));
// Persist the new finalized checkpoint as the pruning checkpoint.
kv_batch.push(store.pruning_checkpoint_store_op(new_finalized_checkpoint));
store.hot_db.do_atomically(kv_batch)?;
debug!(log, "Database pruning complete");
info!(log, "Database pruning complete");
Ok(PruningOutcome::Successful)
}

View File

@ -10,7 +10,8 @@ where
{
/// Clean up the database by performing one-off maintenance at start-up.
pub fn remove_garbage(&self) -> Result<(), Error> {
self.delete_temp_states()
self.delete_temp_states()?;
Ok(())
}
/// Delete the temporary states that were leftover by failed block imports.

View File

@ -9,7 +9,8 @@ use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY,
PruningCheckpoint, SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::{
@ -924,6 +925,25 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
}
}
/// Run a compaction pass to free up space used by deleted states.
pub fn compact(&self) -> Result<(), Error> {
self.hot_db.compact()?;
Ok(())
}
/// Load the checkpoint to begin pruning from (the "old finalized checkpoint").
pub fn load_pruning_checkpoint(&self) -> Result<Option<Checkpoint>, Error> {
Ok(self
.hot_db
.get(&PRUNING_CHECKPOINT_KEY)?
.map(|pc: PruningCheckpoint| pc.checkpoint))
}
/// Create a staged store for the pruning checkpoint.
pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp {
PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY)
}
}
/// Advance the split point of the store, moving new finalized states to the freezer.

View File

@ -1,6 +1,7 @@
use super::*;
use crate::metrics;
use db_key::Key;
use leveldb::compaction::Compaction;
use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
use leveldb::database::Database;
@ -152,6 +153,27 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock()
}
/// Compact all values in the states and states flag columns.
fn compact(&self) -> Result<(), Error> {
let endpoints = |column: DBColumn| {
(
BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())),
BytesKey::from_vec(get_key_for_col(
column.as_str(),
Hash256::repeat_byte(0xff).as_bytes(),
)),
)
};
for (start_key, end_key) in vec![
endpoints(DBColumn::BeaconStateTemporary),
endpoints(DBColumn::BeaconState),
] {
self.db.compact(&start_key, &end_key);
}
Ok(())
}
}
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}

View File

@ -67,6 +67,9 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// This doesn't prevent other threads writing to the DB unless they also use
/// this method. In future we may implement a safer mandatory locking scheme.
fn begin_rw_transaction(&self) -> MutexGuard<()>;
/// Compact the database, freeing space used by deleted items.
fn compact(&self) -> Result<(), Error>;
}
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {

View File

@ -84,6 +84,10 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock()
}
fn compact(&self) -> Result<(), Error> {
Ok(())
}
}
impl<E: EthSpec> ItemStore<E> for MemoryStore<E> {}

View File

@ -1,6 +1,6 @@
use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode};
use types::Hash256;
use types::{Checkpoint, Hash256};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2);
@ -10,6 +10,7 @@ pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(2);
pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0);
pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1);
pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64);
@ -33,3 +34,27 @@ impl StoreItem for SchemaVersion {
Ok(SchemaVersion(u64::from_ssz_bytes(bytes)?))
}
}
/// The checkpoint used for pruning the database.
///
/// Updated whenever pruning is successful.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PruningCheckpoint {
pub checkpoint: Checkpoint,
}
impl StoreItem for PruningCheckpoint {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.checkpoint.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(PruningCheckpoint {
checkpoint: Checkpoint::from_ssz_bytes(bytes)?,
})
}
}