2020-10-19 05:58:39 +00:00
|
|
|
use crate::beacon_chain::BEACON_CHAIN_DB_KEY;
|
2020-04-20 09:59:56 +00:00
|
|
|
use crate::errors::BeaconChainError;
|
2020-10-19 05:58:39 +00:00
|
|
|
use crate::head_tracker::{HeadTracker, SszHeadTracker};
|
|
|
|
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
|
2020-04-20 09:59:56 +00:00
|
|
|
use parking_lot::Mutex;
|
Address queue congestion in migrator (#1923)
## Issue Addressed
*Should* address #1917
## Proposed Changes
Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages.
Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch:
```
Nov 17 16:50:21.606 DEBG Head beacon block slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon
Nov 17 16:50:21.626 DEBG Batch processed service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036
Nov 17 16:50:21.626 DEBG Chain advanced processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Completed batch received awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Requesting batch start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.654 DEBG Database compaction complete service: beacon
Nov 17 16:50:22.655 INFO Starting database pruning new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon
```
I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`.
## TODO
- [x] Remove finalized state requirement for op-pool
2020-11-17 23:11:26 +00:00
|
|
|
use slog::{debug, error, info, warn, Logger};
|
2020-04-20 09:59:56 +00:00
|
|
|
use std::collections::{HashMap, HashSet};
|
|
|
|
use std::mem;
|
2020-11-17 09:10:53 +00:00
|
|
|
use std::sync::{mpsc, Arc};
|
2020-04-20 09:59:56 +00:00
|
|
|
use std::thread;
|
2020-11-17 09:10:53 +00:00
|
|
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
2020-08-26 09:24:55 +00:00
|
|
|
use store::hot_cold_store::{migrate_database, HotColdDBError};
|
2020-08-26 00:01:06 +00:00
|
|
|
use store::iter::RootsIterator;
|
2020-10-19 05:58:39 +00:00
|
|
|
use store::{Error, ItemStore, StoreItem, StoreOp};
|
2020-05-31 22:13:49 +00:00
|
|
|
pub use store::{HotColdDB, MemoryStore};
|
2020-08-26 00:01:06 +00:00
|
|
|
use types::{
|
2020-10-19 05:58:39 +00:00
|
|
|
BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256,
|
2020-08-26 00:01:06 +00:00
|
|
|
SignedBeaconBlockHash, Slot,
|
|
|
|
};
|
|
|
|
|
2020-11-17 09:10:53 +00:00
|
|
|
/// Compact at least this frequently, finalization permitting (7 days).
|
|
|
|
const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800;
|
|
|
|
/// Compact at *most* this frequently, to prevent over-compaction during sync (2 hours).
|
|
|
|
const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200;
|
|
|
|
/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`.
|
|
|
|
const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
|
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
/// The background migrator runs a thread to perform pruning and migrate state from the hot
|
|
|
|
/// to the cold database.
|
|
|
|
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
|
|
|
db: Arc<HotColdDB<E, Hot, Cold>>,
|
|
|
|
#[allow(clippy::type_complexity)]
|
Address queue congestion in migrator (#1923)
## Issue Addressed
*Should* address #1917
## Proposed Changes
Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages.
Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch:
```
Nov 17 16:50:21.606 DEBG Head beacon block slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon
Nov 17 16:50:21.626 DEBG Batch processed service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036
Nov 17 16:50:21.626 DEBG Chain advanced processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Completed batch received awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Requesting batch start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.654 DEBG Database compaction complete service: beacon
Nov 17 16:50:22.655 INFO Starting database pruning new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon
```
I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`.
## TODO
- [x] Remove finalized state requirement for op-pool
2020-11-17 23:11:26 +00:00
|
|
|
tx_thread: Option<Mutex<(mpsc::Sender<MigrationNotification>, thread::JoinHandle<()>)>>,
|
2020-10-19 05:58:39 +00:00
|
|
|
/// Genesis block root, for persisting the `PersistedBeaconChain`.
|
|
|
|
genesis_block_root: Hash256,
|
|
|
|
log: Logger,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
|
|
|
pub struct MigratorConfig {
|
|
|
|
pub blocking: bool,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl MigratorConfig {
|
|
|
|
pub fn blocking(mut self) -> Self {
|
|
|
|
self.blocking = true;
|
|
|
|
self
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Pruning can be successful, or in rare cases deferred to a later point.
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
|
|
pub enum PruningOutcome {
|
2020-11-17 09:10:53 +00:00
|
|
|
/// The pruning succeeded and updated the pruning checkpoint from `old_finalized_checkpoint`.
|
|
|
|
Successful {
|
|
|
|
old_finalized_checkpoint: Checkpoint,
|
|
|
|
},
|
2020-10-19 05:58:39 +00:00
|
|
|
DeferredConcurrentMutation,
|
|
|
|
}
|
|
|
|
|
2020-08-26 00:01:06 +00:00
|
|
|
/// Logic errors that can occur during pruning, none of these should ever happen.
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum PruningError {
|
|
|
|
IncorrectFinalizedState {
|
|
|
|
state_slot: Slot,
|
|
|
|
new_finalized_slot: Slot,
|
|
|
|
},
|
|
|
|
MissingInfoForCanonicalChain {
|
|
|
|
slot: Slot,
|
|
|
|
},
|
|
|
|
UnexpectedEqualStateRoots,
|
|
|
|
UnexpectedUnequalStateRoots,
|
|
|
|
}
|
2020-04-20 09:59:56 +00:00
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
/// Message sent to the migration thread containing the information it needs to run.
|
Address queue congestion in migrator (#1923)
## Issue Addressed
*Should* address #1917
## Proposed Changes
Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages.
Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch:
```
Nov 17 16:50:21.606 DEBG Head beacon block slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon
Nov 17 16:50:21.626 DEBG Batch processed service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036
Nov 17 16:50:21.626 DEBG Chain advanced processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Completed batch received awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Requesting batch start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.654 DEBG Database compaction complete service: beacon
Nov 17 16:50:22.655 INFO Starting database pruning new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon
```
I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`.
## TODO
- [x] Remove finalized state requirement for op-pool
2020-11-17 23:11:26 +00:00
|
|
|
pub struct MigrationNotification {
|
2020-10-19 05:58:39 +00:00
|
|
|
finalized_state_root: BeaconStateHash,
|
|
|
|
finalized_checkpoint: Checkpoint,
|
|
|
|
head_tracker: Arc<HeadTracker>,
|
|
|
|
genesis_block_root: Hash256,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
|
|
|
|
/// Create a new `BackgroundMigrator` and spawn its thread if necessary.
|
|
|
|
pub fn new(
|
|
|
|
db: Arc<HotColdDB<E, Hot, Cold>>,
|
|
|
|
config: MigratorConfig,
|
|
|
|
genesis_block_root: Hash256,
|
|
|
|
log: Logger,
|
|
|
|
) -> Self {
|
|
|
|
let tx_thread = if config.blocking {
|
|
|
|
None
|
|
|
|
} else {
|
|
|
|
Some(Mutex::new(Self::spawn_thread(db.clone(), log.clone())))
|
|
|
|
};
|
|
|
|
Self {
|
|
|
|
db,
|
|
|
|
tx_thread,
|
|
|
|
genesis_block_root,
|
|
|
|
log,
|
|
|
|
}
|
|
|
|
}
|
2020-04-20 09:59:56 +00:00
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
/// Process a finalized checkpoint from the `BeaconChain`.
|
|
|
|
///
|
|
|
|
/// If successful, all forks descending from before the `finalized_checkpoint` will be
|
|
|
|
/// pruned, and the split point of the database will be advanced to the slot of the finalized
|
|
|
|
/// checkpoint.
|
|
|
|
pub fn process_finalization(
|
2020-04-20 09:59:56 +00:00
|
|
|
&self,
|
2020-10-19 05:58:39 +00:00
|
|
|
finalized_state_root: BeaconStateHash,
|
|
|
|
finalized_checkpoint: Checkpoint,
|
|
|
|
head_tracker: Arc<HeadTracker>,
|
2020-08-26 09:24:55 +00:00
|
|
|
) -> Result<(), BeaconChainError> {
|
2020-10-19 05:58:39 +00:00
|
|
|
let notif = MigrationNotification {
|
|
|
|
finalized_state_root,
|
|
|
|
finalized_checkpoint,
|
|
|
|
head_tracker,
|
|
|
|
genesis_block_root: self.genesis_block_root,
|
|
|
|
};
|
|
|
|
|
|
|
|
// Async path, on the background thread.
|
|
|
|
if let Some(tx_thread) = &self.tx_thread {
|
|
|
|
let (ref mut tx, ref mut thread) = *tx_thread.lock();
|
|
|
|
|
|
|
|
// Restart the background thread if it has crashed.
|
|
|
|
if let Err(tx_err) = tx.send(notif) {
|
|
|
|
let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone());
|
|
|
|
|
|
|
|
*tx = new_tx;
|
|
|
|
let old_thread = mem::replace(thread, new_thread);
|
|
|
|
|
|
|
|
// Join the old thread, which will probably have panicked, or may have
|
|
|
|
// halted normally just now as a result of us dropping the old `mpsc::Sender`.
|
|
|
|
if let Err(thread_err) = old_thread.join() {
|
|
|
|
warn!(
|
|
|
|
self.log,
|
|
|
|
"Migration thread died, so it was restarted";
|
|
|
|
"reason" => format!("{:?}", thread_err)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retry at most once, we could recurse but that would risk overflowing the stack.
|
|
|
|
let _ = tx.send(tx_err.0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Synchronous path, on the current thread.
|
|
|
|
else {
|
|
|
|
Self::run_migration(self.db.clone(), notif, &self.log)
|
|
|
|
}
|
|
|
|
|
2020-08-26 09:24:55 +00:00
|
|
|
Ok(())
|
2020-04-20 09:59:56 +00:00
|
|
|
}
|
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
/// Perform the actual work of `process_finalization`.
|
Address queue congestion in migrator (#1923)
## Issue Addressed
*Should* address #1917
## Proposed Changes
Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages.
Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch:
```
Nov 17 16:50:21.606 DEBG Head beacon block slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon
Nov 17 16:50:21.626 DEBG Batch processed service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036
Nov 17 16:50:21.626 DEBG Chain advanced processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Completed batch received awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Requesting batch start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.654 DEBG Database compaction complete service: beacon
Nov 17 16:50:22.655 INFO Starting database pruning new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon
```
I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`.
## TODO
- [x] Remove finalized state requirement for op-pool
2020-11-17 23:11:26 +00:00
|
|
|
fn run_migration(db: Arc<HotColdDB<E, Hot, Cold>>, notif: MigrationNotification, log: &Logger) {
|
2020-10-19 05:58:39 +00:00
|
|
|
let finalized_state_root = notif.finalized_state_root;
|
Address queue congestion in migrator (#1923)
## Issue Addressed
*Should* address #1917
## Proposed Changes
Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages.
Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch:
```
Nov 17 16:50:21.606 DEBG Head beacon block slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon
Nov 17 16:50:21.626 DEBG Batch processed service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036
Nov 17 16:50:21.626 DEBG Chain advanced processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Completed batch received awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Requesting batch start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.654 DEBG Database compaction complete service: beacon
Nov 17 16:50:22.655 INFO Starting database pruning new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon
```
I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`.
## TODO
- [x] Remove finalized state requirement for op-pool
2020-11-17 23:11:26 +00:00
|
|
|
|
|
|
|
let finalized_state = match db.get_state(&finalized_state_root.into(), None) {
|
|
|
|
Ok(Some(state)) => state,
|
|
|
|
other => {
|
|
|
|
error!(
|
|
|
|
log,
|
|
|
|
"Migrator failed to load state";
|
|
|
|
"state_root" => ?finalized_state_root,
|
|
|
|
"error" => ?other
|
|
|
|
);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
2020-10-19 05:58:39 +00:00
|
|
|
|
2020-11-17 09:10:53 +00:00
|
|
|
let old_finalized_checkpoint = match Self::prune_abandoned_forks(
|
2020-10-19 05:58:39 +00:00
|
|
|
db.clone(),
|
|
|
|
notif.head_tracker,
|
|
|
|
finalized_state_root,
|
|
|
|
&finalized_state,
|
|
|
|
notif.finalized_checkpoint,
|
|
|
|
notif.genesis_block_root,
|
|
|
|
log,
|
|
|
|
) {
|
2020-11-17 09:10:53 +00:00
|
|
|
Ok(PruningOutcome::Successful {
|
|
|
|
old_finalized_checkpoint,
|
|
|
|
}) => old_finalized_checkpoint,
|
2020-10-19 05:58:39 +00:00
|
|
|
Ok(PruningOutcome::DeferredConcurrentMutation) => {
|
|
|
|
warn!(
|
|
|
|
log,
|
|
|
|
"Pruning deferred because of a concurrent mutation";
|
|
|
|
"message" => "this is expected only very rarely!"
|
|
|
|
);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
warn!(log, "Block pruning failed"; "error" => format!("{:?}", e));
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2020-11-09 07:02:21 +00:00
|
|
|
match migrate_database(db.clone(), finalized_state_root.into(), &finalized_state) {
|
2020-10-19 05:58:39 +00:00
|
|
|
Ok(()) => {}
|
|
|
|
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
|
|
|
|
debug!(
|
|
|
|
log,
|
|
|
|
"Database migration postponed, unaligned finalized block";
|
|
|
|
"slot" => slot.as_u64()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
warn!(
|
|
|
|
log,
|
|
|
|
"Database migration failed";
|
|
|
|
"error" => format!("{:?}", e)
|
|
|
|
);
|
2020-11-09 07:02:21 +00:00
|
|
|
return;
|
2020-10-19 05:58:39 +00:00
|
|
|
}
|
|
|
|
};
|
2020-11-09 07:02:21 +00:00
|
|
|
|
|
|
|
// Finally, compact the database so that new free space is properly reclaimed.
|
2020-11-17 09:10:53 +00:00
|
|
|
if let Err(e) = Self::run_compaction(
|
|
|
|
db,
|
|
|
|
old_finalized_checkpoint.epoch,
|
|
|
|
notif.finalized_checkpoint.epoch,
|
|
|
|
log,
|
|
|
|
) {
|
|
|
|
warn!(log, "Database compaction failed"; "error" => format!("{:?}", e));
|
2020-11-09 07:02:21 +00:00
|
|
|
}
|
2020-10-19 05:58:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Spawn a new child thread to run the migration process.
|
|
|
|
///
|
|
|
|
/// Return a channel handle for sending new finalized states to the thread.
|
|
|
|
fn spawn_thread(
|
|
|
|
db: Arc<HotColdDB<E, Hot, Cold>>,
|
|
|
|
log: Logger,
|
Address queue congestion in migrator (#1923)
## Issue Addressed
*Should* address #1917
## Proposed Changes
Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages.
Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch:
```
Nov 17 16:50:21.606 DEBG Head beacon block slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon
Nov 17 16:50:21.626 DEBG Batch processed service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036
Nov 17 16:50:21.626 DEBG Chain advanced processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Completed batch received awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Requesting batch start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.654 DEBG Database compaction complete service: beacon
Nov 17 16:50:22.655 INFO Starting database pruning new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon
```
I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`.
## TODO
- [x] Remove finalized state requirement for op-pool
2020-11-17 23:11:26 +00:00
|
|
|
) -> (mpsc::Sender<MigrationNotification>, thread::JoinHandle<()>) {
|
2020-10-19 05:58:39 +00:00
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
let thread = thread::spawn(move || {
|
|
|
|
while let Ok(notif) = rx.recv() {
|
Address queue congestion in migrator (#1923)
## Issue Addressed
*Should* address #1917
## Proposed Changes
Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages.
Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch:
```
Nov 17 16:50:21.606 DEBG Head beacon block slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon
Nov 17 16:50:21.626 DEBG Batch processed service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036
Nov 17 16:50:21.626 DEBG Chain advanced processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Completed batch received awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.162 DEBG Requesting batch start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync
Nov 17 16:50:22.654 DEBG Database compaction complete service: beacon
Nov 17 16:50:22.655 INFO Starting database pruning new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon
```
I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`.
## TODO
- [x] Remove finalized state requirement for op-pool
2020-11-17 23:11:26 +00:00
|
|
|
// Read the rest of the messages in the channel, ultimately choosing the `notif`
|
|
|
|
// with the highest finalized epoch.
|
|
|
|
let notif = rx
|
|
|
|
.try_iter()
|
|
|
|
.fold(notif, |best, other: MigrationNotification| {
|
|
|
|
if other.finalized_checkpoint.epoch > best.finalized_checkpoint.epoch {
|
|
|
|
other
|
|
|
|
} else {
|
|
|
|
best
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
Self::run_migration(db.clone(), notif, &log);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
(tx, thread)
|
|
|
|
}
|
|
|
|
|
2020-04-20 09:59:56 +00:00
|
|
|
/// Traverses live heads and prunes blocks and states of chains that we know can't be built
|
2020-08-26 00:01:06 +00:00
|
|
|
/// upon because finalization would prohibit it. This is an optimisation intended to save disk
|
2020-04-20 09:59:56 +00:00
|
|
|
/// space.
|
2020-10-19 05:58:39 +00:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2020-04-20 09:59:56 +00:00
|
|
|
fn prune_abandoned_forks(
|
2020-06-16 01:34:04 +00:00
|
|
|
store: Arc<HotColdDB<E, Hot, Cold>>,
|
2020-04-20 09:59:56 +00:00
|
|
|
head_tracker: Arc<HeadTracker>,
|
2020-08-26 00:01:06 +00:00
|
|
|
new_finalized_state_hash: BeaconStateHash,
|
|
|
|
new_finalized_state: &BeaconState<E>,
|
|
|
|
new_finalized_checkpoint: Checkpoint,
|
2020-10-19 05:58:39 +00:00
|
|
|
genesis_block_root: Hash256,
|
2020-08-26 00:01:06 +00:00
|
|
|
log: &Logger,
|
2020-10-19 05:58:39 +00:00
|
|
|
) -> Result<PruningOutcome, BeaconChainError> {
|
2020-11-09 07:02:21 +00:00
|
|
|
let old_finalized_checkpoint =
|
|
|
|
store
|
|
|
|
.load_pruning_checkpoint()?
|
|
|
|
.unwrap_or_else(|| Checkpoint {
|
|
|
|
epoch: Epoch::new(0),
|
|
|
|
root: Hash256::zero(),
|
|
|
|
});
|
|
|
|
|
2020-08-26 00:01:06 +00:00
|
|
|
let old_finalized_slot = old_finalized_checkpoint
|
|
|
|
.epoch
|
|
|
|
.start_slot(E::slots_per_epoch());
|
|
|
|
let new_finalized_slot = new_finalized_checkpoint
|
|
|
|
.epoch
|
|
|
|
.start_slot(E::slots_per_epoch());
|
|
|
|
let new_finalized_block_hash = new_finalized_checkpoint.root.into();
|
|
|
|
|
|
|
|
// The finalized state must be for the epoch boundary slot, not the slot of the finalized
|
|
|
|
// block.
|
2021-07-09 06:15:32 +00:00
|
|
|
if new_finalized_state.slot() != new_finalized_slot {
|
2020-08-26 00:01:06 +00:00
|
|
|
return Err(PruningError::IncorrectFinalizedState {
|
2021-07-09 06:15:32 +00:00
|
|
|
state_slot: new_finalized_state.slot(),
|
2020-08-26 00:01:06 +00:00
|
|
|
new_finalized_slot,
|
|
|
|
}
|
|
|
|
.into());
|
|
|
|
}
|
|
|
|
|
2020-11-17 09:10:53 +00:00
|
|
|
debug!(
|
2020-08-26 00:01:06 +00:00
|
|
|
log,
|
|
|
|
"Starting database pruning";
|
|
|
|
"old_finalized_epoch" => old_finalized_checkpoint.epoch,
|
|
|
|
"new_finalized_epoch" => new_finalized_checkpoint.epoch,
|
|
|
|
);
|
|
|
|
// For each slot between the new finalized checkpoint and the old finalized checkpoint,
|
|
|
|
// collect the beacon block root and state root of the canonical chain.
|
|
|
|
let newly_finalized_chain: HashMap<Slot, (SignedBeaconBlockHash, BeaconStateHash)> =
|
|
|
|
std::iter::once(Ok((
|
|
|
|
new_finalized_slot,
|
|
|
|
(new_finalized_block_hash, new_finalized_state_hash),
|
|
|
|
)))
|
|
|
|
.chain(
|
|
|
|
RootsIterator::new(store.clone(), new_finalized_state).map(|res| {
|
|
|
|
res.map(|(block_root, state_root, slot)| {
|
|
|
|
(slot, (block_root.into(), state_root.into()))
|
|
|
|
})
|
|
|
|
}),
|
|
|
|
)
|
|
|
|
.take_while(|res| {
|
|
|
|
res.as_ref()
|
|
|
|
.map_or(true, |(slot, _)| *slot >= old_finalized_slot)
|
|
|
|
})
|
|
|
|
.collect::<Result<_, _>>()?;
|
2020-04-20 09:59:56 +00:00
|
|
|
|
|
|
|
// We don't know which blocks are shared among abandoned chains, so we buffer and delete
|
|
|
|
// everything in one fell swoop.
|
|
|
|
let mut abandoned_blocks: HashSet<SignedBeaconBlockHash> = HashSet::new();
|
|
|
|
let mut abandoned_states: HashSet<(Slot, BeaconStateHash)> = HashSet::new();
|
|
|
|
let mut abandoned_heads: HashSet<Hash256> = HashSet::new();
|
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
let heads = head_tracker.heads();
|
2020-11-09 07:02:21 +00:00
|
|
|
debug!(
|
|
|
|
log,
|
|
|
|
"Extra pruning information";
|
|
|
|
"old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root),
|
|
|
|
"new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root),
|
|
|
|
"head_count" => heads.len(),
|
|
|
|
);
|
2020-10-19 05:58:39 +00:00
|
|
|
|
|
|
|
for (head_hash, head_slot) in heads {
|
2021-08-30 06:41:31 +00:00
|
|
|
// Load head block. If it fails with a decode error, it's likely a reverted block,
|
|
|
|
// so delete it from the head tracker but leave it and its states in the database
|
|
|
|
// This is suboptimal as it wastes disk space, but it's difficult to fix. A re-sync
|
|
|
|
// can be used to reclaim the space.
|
|
|
|
let head_state_root = match store.get_block(&head_hash) {
|
|
|
|
Ok(Some(block)) => block.state_root(),
|
|
|
|
Ok(None) => {
|
|
|
|
return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into())
|
|
|
|
}
|
|
|
|
Err(Error::SszDecodeError(e)) => {
|
|
|
|
warn!(
|
|
|
|
log,
|
|
|
|
"Forgetting invalid head block";
|
|
|
|
"block_root" => ?head_hash,
|
|
|
|
"error" => ?e,
|
|
|
|
);
|
|
|
|
abandoned_heads.insert(head_hash);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
Err(e) => return Err(e.into()),
|
|
|
|
};
|
|
|
|
|
2020-08-26 00:01:06 +00:00
|
|
|
let mut potentially_abandoned_head = Some(head_hash);
|
|
|
|
let mut potentially_abandoned_blocks = vec![];
|
2020-04-20 09:59:56 +00:00
|
|
|
|
2020-08-26 00:01:06 +00:00
|
|
|
// Iterate backwards from this head, staging blocks and states for deletion.
|
2021-08-30 06:41:31 +00:00
|
|
|
let iter = std::iter::once(Ok((head_hash, head_state_root, head_slot)))
|
2020-08-26 00:01:06 +00:00
|
|
|
.chain(RootsIterator::from_block(store.clone(), head_hash)?);
|
|
|
|
|
2020-06-09 23:55:44 +00:00
|
|
|
for maybe_tuple in iter {
|
2020-08-26 00:01:06 +00:00
|
|
|
let (block_root, state_root, slot) = maybe_tuple?;
|
|
|
|
let block_root = SignedBeaconBlockHash::from(block_root);
|
|
|
|
let state_root = BeaconStateHash::from(state_root);
|
|
|
|
|
|
|
|
match newly_finalized_chain.get(&slot) {
|
|
|
|
// If there's no information about a slot on the finalized chain, then
|
|
|
|
// it should be because it's ahead of the new finalized slot. Stage
|
|
|
|
// the fork's block and state for possible deletion.
|
2020-04-20 09:59:56 +00:00
|
|
|
None => {
|
2020-08-26 00:01:06 +00:00
|
|
|
if slot > new_finalized_slot {
|
|
|
|
potentially_abandoned_blocks.push((
|
|
|
|
slot,
|
|
|
|
Some(block_root),
|
|
|
|
Some(state_root),
|
|
|
|
));
|
|
|
|
} else if slot >= old_finalized_slot {
|
|
|
|
return Err(PruningError::MissingInfoForCanonicalChain { slot }.into());
|
|
|
|
} else {
|
|
|
|
// We must assume here any candidate chains include the old finalized
|
|
|
|
// checkpoint, i.e. there aren't any forks starting at a block that is a
|
|
|
|
// strict ancestor of old_finalized_checkpoint.
|
|
|
|
warn!(
|
|
|
|
log,
|
|
|
|
"Found a chain that should already have been pruned";
|
|
|
|
"head_block_root" => format!("{:?}", head_hash),
|
|
|
|
"head_slot" => head_slot,
|
|
|
|
);
|
2020-10-19 05:58:39 +00:00
|
|
|
potentially_abandoned_head.take();
|
2020-08-26 00:01:06 +00:00
|
|
|
break;
|
|
|
|
}
|
2020-04-20 09:59:56 +00:00
|
|
|
}
|
2020-08-26 00:01:06 +00:00
|
|
|
Some((finalized_block_root, finalized_state_root)) => {
|
|
|
|
// This fork descends from a newly finalized block, we can stop.
|
|
|
|
if block_root == *finalized_block_root {
|
|
|
|
// Sanity check: if the slot and block root match, then the
|
|
|
|
// state roots should match too.
|
|
|
|
if state_root != *finalized_state_root {
|
|
|
|
return Err(PruningError::UnexpectedUnequalStateRoots.into());
|
|
|
|
}
|
|
|
|
|
|
|
|
// If the fork descends from the whole finalized chain,
|
|
|
|
// do not prune it. Otherwise continue to delete all
|
|
|
|
// of the blocks and states that have been staged for
|
|
|
|
// deletion so far.
|
|
|
|
if slot == new_finalized_slot {
|
2020-04-20 09:59:56 +00:00
|
|
|
potentially_abandoned_blocks.clear();
|
|
|
|
potentially_abandoned_head.take();
|
|
|
|
}
|
2020-08-26 00:01:06 +00:00
|
|
|
// If there are skipped slots on the fork to be pruned, then
|
|
|
|
// we will have just staged the common block for deletion.
|
|
|
|
// Unstage it.
|
|
|
|
else {
|
|
|
|
for (_, block_root, _) in
|
|
|
|
potentially_abandoned_blocks.iter_mut().rev()
|
|
|
|
{
|
|
|
|
if block_root.as_ref() == Some(finalized_block_root) {
|
|
|
|
*block_root = None;
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-04-20 09:59:56 +00:00
|
|
|
break;
|
2020-08-26 00:01:06 +00:00
|
|
|
} else {
|
|
|
|
if state_root == *finalized_state_root {
|
|
|
|
return Err(PruningError::UnexpectedEqualStateRoots.into());
|
|
|
|
}
|
2020-04-20 09:59:56 +00:00
|
|
|
potentially_abandoned_blocks.push((
|
|
|
|
slot,
|
2020-08-26 00:01:06 +00:00
|
|
|
Some(block_root),
|
|
|
|
Some(state_root),
|
2020-04-20 09:59:56 +00:00
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-26 00:01:06 +00:00
|
|
|
if let Some(abandoned_head) = potentially_abandoned_head {
|
|
|
|
debug!(
|
|
|
|
log,
|
|
|
|
"Pruning head";
|
|
|
|
"head_block_root" => format!("{:?}", abandoned_head),
|
|
|
|
"head_slot" => head_slot,
|
|
|
|
);
|
|
|
|
abandoned_heads.insert(abandoned_head);
|
2020-04-20 09:59:56 +00:00
|
|
|
abandoned_blocks.extend(
|
|
|
|
potentially_abandoned_blocks
|
|
|
|
.iter()
|
|
|
|
.filter_map(|(_, maybe_block_hash, _)| *maybe_block_hash),
|
|
|
|
);
|
|
|
|
abandoned_states.extend(potentially_abandoned_blocks.iter().filter_map(
|
2020-08-26 00:01:06 +00:00
|
|
|
|(slot, _, maybe_state_hash)| maybe_state_hash.map(|sr| (*slot, sr)),
|
2020-04-20 09:59:56 +00:00
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
// Update the head tracker before the database, so that we maintain the invariant
|
|
|
|
// that a block present in the head tracker is present in the database.
|
|
|
|
// See https://github.com/sigp/lighthouse/issues/1557
|
|
|
|
let mut head_tracker_lock = head_tracker.0.write();
|
|
|
|
|
|
|
|
// Check that all the heads to be deleted are still present. The absence of any
|
|
|
|
// head indicates a race, that will likely resolve itself, so we defer pruning until
|
|
|
|
// later.
|
|
|
|
for head_hash in &abandoned_heads {
|
|
|
|
if !head_tracker_lock.contains_key(head_hash) {
|
|
|
|
return Ok(PruningOutcome::DeferredConcurrentMutation);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Then remove them for real.
|
|
|
|
for head_hash in abandoned_heads {
|
|
|
|
head_tracker_lock.remove(&head_hash);
|
|
|
|
}
|
|
|
|
|
2020-07-01 02:45:57 +00:00
|
|
|
let batch: Vec<StoreOp<E>> = abandoned_blocks
|
2020-05-16 03:23:32 +00:00
|
|
|
.into_iter()
|
Implement database temp states to reduce memory usage (#1798)
## Issue Addressed
Closes #800
Closes #1713
## Proposed Changes
Implement the temporary state storage algorithm described in #800. Specifically:
* Add `DBColumn::BeaconStateTemporary`, for storing 0-length temporary marker values.
* Store intermediate states immediately as they are created, marked temporary. Delete the temporary flag if the block is processed successfully.
* Add a garbage collection process to delete leftover temporary states on start-up.
* Bump the database schema version to 2 so that a DB with temporary states can't accidentally be used with older versions of the software. The auto-migration is a no-op, but puts in place some infra that we can use for future migrations (e.g. #1784)
## Additional Info
There are two known race conditions, one potentially causing permanent faults (hopefully rare), and the other insignificant.
### Race 1: Permanent state marked temporary
EDIT: this has been fixed by the addition of a lock around the relevant critical section
There are 2 threads that are trying to store 2 different blocks that share some intermediate states (e.g. they both skip some slots from the current head). Consider this sequence of events:
1. Thread 1 checks if state `s` already exists, and seeing that it doesn't, prepares an atomic commit of `(s, s_temporary_flag)`.
2. Thread 2 does the same, but also gets as far as committing the state txn, finishing the processing of its block, and _deleting_ the temporary flag.
3. Thread 1 is (finally) scheduled again, and marks `s` as temporary with its transaction.
4.
a) The process is killed, or thread 1's block fails verification and the temp flag is not deleted. This is a permanent failure! Any attempt to load state `s` will fail... hope it isn't on the main chain! Alternatively (4b) happens...
b) Thread 1 finishes, and re-deletes the temporary flag. In this case the failure is transient, state `s` will disappear temporarily, but will come back once thread 1 finishes running.
I _hope_ that steps 1-3 only happen very rarely, and 4a even more rarely. It's hard to know
This once again begs the question of why we're using LevelDB (#483), when it clearly doesn't care about atomicity! A ham-fisted fix would be to wrap the hot and cold DBs in locks, which would bring us closer to how other DBs handle read-write transactions. E.g. [LMDB only allows one R/W transaction at a time](https://docs.rs/lmdb/0.8.0/lmdb/struct.Environment.html#method.begin_rw_txn).
### Race 2: Temporary state returned from `get_state`
I don't think this race really matters, but in `load_hot_state`, if another thread stores a state between when we call `load_state_temporary_flag` and when we call `load_hot_state_summary`, then we could end up returning that state even though it's only a temporary state. I can't think of any case where this would be relevant, and I suspect if it did come up, it would be safe/recoverable (having data is safer than _not_ having data).
This could be fixed by using a LevelDB read snapshot, but that would require substantial changes to how we read all our values, so I don't think it's worth it right now.
2020-10-23 01:27:51 +00:00
|
|
|
.map(Into::into)
|
2020-07-23 14:18:00 +00:00
|
|
|
.map(StoreOp::DeleteBlock)
|
2020-05-16 03:23:32 +00:00
|
|
|
.chain(
|
|
|
|
abandoned_states
|
|
|
|
.into_iter()
|
Implement database temp states to reduce memory usage (#1798)
## Issue Addressed
Closes #800
Closes #1713
## Proposed Changes
Implement the temporary state storage algorithm described in #800. Specifically:
* Add `DBColumn::BeaconStateTemporary`, for storing 0-length temporary marker values.
* Store intermediate states immediately as they are created, marked temporary. Delete the temporary flag if the block is processed successfully.
* Add a garbage collection process to delete leftover temporary states on start-up.
* Bump the database schema version to 2 so that a DB with temporary states can't accidentally be used with older versions of the software. The auto-migration is a no-op, but puts in place some infra that we can use for future migrations (e.g. #1784)
## Additional Info
There are two known race conditions, one potentially causing permanent faults (hopefully rare), and the other insignificant.
### Race 1: Permanent state marked temporary
EDIT: this has been fixed by the addition of a lock around the relevant critical section
There are 2 threads that are trying to store 2 different blocks that share some intermediate states (e.g. they both skip some slots from the current head). Consider this sequence of events:
1. Thread 1 checks if state `s` already exists, and seeing that it doesn't, prepares an atomic commit of `(s, s_temporary_flag)`.
2. Thread 2 does the same, but also gets as far as committing the state txn, finishing the processing of its block, and _deleting_ the temporary flag.
3. Thread 1 is (finally) scheduled again, and marks `s` as temporary with its transaction.
4.
a) The process is killed, or thread 1's block fails verification and the temp flag is not deleted. This is a permanent failure! Any attempt to load state `s` will fail... hope it isn't on the main chain! Alternatively (4b) happens...
b) Thread 1 finishes, and re-deletes the temporary flag. In this case the failure is transient, state `s` will disappear temporarily, but will come back once thread 1 finishes running.
I _hope_ that steps 1-3 only happen very rarely, and 4a even more rarely. It's hard to know
This once again begs the question of why we're using LevelDB (#483), when it clearly doesn't care about atomicity! A ham-fisted fix would be to wrap the hot and cold DBs in locks, which would bring us closer to how other DBs handle read-write transactions. E.g. [LMDB only allows one R/W transaction at a time](https://docs.rs/lmdb/0.8.0/lmdb/struct.Environment.html#method.begin_rw_txn).
### Race 2: Temporary state returned from `get_state`
I don't think this race really matters, but in `load_hot_state`, if another thread stores a state between when we call `load_state_temporary_flag` and when we call `load_hot_state_summary`, then we could end up returning that state even though it's only a temporary state. I can't think of any case where this would be relevant, and I suspect if it did come up, it would be safe/recoverable (having data is safer than _not_ having data).
This could be fixed by using a LevelDB read snapshot, but that would require substantial changes to how we read all our values, so I don't think it's worth it right now.
2020-10-23 01:27:51 +00:00
|
|
|
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash.into(), Some(slot))),
|
2020-05-16 03:23:32 +00:00
|
|
|
)
|
|
|
|
.collect();
|
2020-08-26 09:24:55 +00:00
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
let mut kv_batch = store.convert_to_kv_batch(&batch)?;
|
2020-04-20 09:59:56 +00:00
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
// Persist the head in case the process is killed or crashes here. This prevents
|
|
|
|
// the head tracker reverting after our mutation above.
|
|
|
|
let persisted_head = PersistedBeaconChain {
|
|
|
|
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
|
|
|
|
genesis_block_root,
|
|
|
|
ssz_head_tracker: SszHeadTracker::from_map(&*head_tracker_lock),
|
|
|
|
};
|
|
|
|
drop(head_tracker_lock);
|
|
|
|
kv_batch.push(persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY));
|
2020-08-26 00:01:06 +00:00
|
|
|
|
2020-11-09 07:02:21 +00:00
|
|
|
// Persist the new finalized checkpoint as the pruning checkpoint.
|
|
|
|
kv_batch.push(store.pruning_checkpoint_store_op(new_finalized_checkpoint));
|
|
|
|
|
2020-10-19 05:58:39 +00:00
|
|
|
store.hot_db.do_atomically(kv_batch)?;
|
2020-11-17 09:10:53 +00:00
|
|
|
debug!(log, "Database pruning complete");
|
|
|
|
|
|
|
|
Ok(PruningOutcome::Successful {
|
|
|
|
old_finalized_checkpoint,
|
|
|
|
})
|
|
|
|
}
|
2020-04-20 09:59:56 +00:00
|
|
|
|
2020-11-17 09:10:53 +00:00
|
|
|
/// Compact the database if it has been more than `COMPACTION_PERIOD_SECONDS` since it
|
|
|
|
/// was last compacted.
|
|
|
|
pub fn run_compaction(
|
|
|
|
db: Arc<HotColdDB<E, Hot, Cold>>,
|
|
|
|
old_finalized_epoch: Epoch,
|
|
|
|
new_finalized_epoch: Epoch,
|
|
|
|
log: &Logger,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
if !db.compact_on_prune() {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
|
|
|
let last_compaction_timestamp = db
|
|
|
|
.load_compaction_timestamp()?
|
|
|
|
.unwrap_or_else(|| Duration::from_secs(0));
|
|
|
|
let start_time = SystemTime::now()
|
|
|
|
.duration_since(UNIX_EPOCH)
|
|
|
|
.unwrap_or(last_compaction_timestamp);
|
|
|
|
let seconds_since_last_compaction = start_time
|
|
|
|
.checked_sub(last_compaction_timestamp)
|
|
|
|
.as_ref()
|
|
|
|
.map_or(0, Duration::as_secs);
|
|
|
|
|
|
|
|
if seconds_since_last_compaction > MAX_COMPACTION_PERIOD_SECONDS
|
|
|
|
|| (new_finalized_epoch - old_finalized_epoch > COMPACTION_FINALITY_DISTANCE
|
|
|
|
&& seconds_since_last_compaction > MIN_COMPACTION_PERIOD_SECONDS)
|
|
|
|
{
|
|
|
|
info!(
|
|
|
|
log,
|
|
|
|
"Starting database compaction";
|
|
|
|
"old_finalized_epoch" => old_finalized_epoch,
|
|
|
|
"new_finalized_epoch" => new_finalized_epoch,
|
|
|
|
);
|
|
|
|
db.compact()?;
|
|
|
|
|
|
|
|
let finish_time = SystemTime::now()
|
|
|
|
.duration_since(UNIX_EPOCH)
|
|
|
|
.unwrap_or(start_time);
|
|
|
|
db.store_compaction_timestamp(finish_time)?;
|
|
|
|
|
|
|
|
info!(log, "Database compaction complete");
|
|
|
|
}
|
|
|
|
Ok(())
|
2020-04-20 09:59:56 +00:00
|
|
|
}
|
|
|
|
}
|