Harden the freezing procedure against failures (#1323)
* Enable logging in tests * Migrate states to the freezer atomically
This commit is contained in:
parent
9dab928572
commit
c7f47af9fb
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -359,6 +359,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_yaml",
|
||||
"slog",
|
||||
"slog-term",
|
||||
"sloggers",
|
||||
"slot_clock",
|
||||
"smallvec 1.4.0",
|
||||
|
@ -25,6 +25,7 @@ serde_derive = "1.0.110"
|
||||
serde_yaml = "0.8.11"
|
||||
serde_json = "1.0.52"
|
||||
slog = { version = "2.5.2", features = ["max_level_trace"] }
|
||||
slog-term = "2.6.0"
|
||||
sloggers = "1.0.0"
|
||||
slot_clock = { path = "../../common/slot_clock" }
|
||||
eth2_hashing = "0.1.0"
|
||||
|
@ -3,6 +3,11 @@
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
|
||||
#[macro_use]
|
||||
extern crate slog;
|
||||
extern crate slog_term;
|
||||
|
||||
use crate::slog::Drain;
|
||||
use beacon_chain::attestation_verification::Error as AttnError;
|
||||
use beacon_chain::test_utils::{
|
||||
AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType,
|
||||
@ -10,7 +15,6 @@ use beacon_chain::test_utils::{
|
||||
use beacon_chain::BeaconSnapshot;
|
||||
use beacon_chain::StateSkipConfig;
|
||||
use rand::Rng;
|
||||
use sloggers::{null::NullLoggerBuilder, Build};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
@ -40,7 +44,11 @@ fn get_store(db_path: &TempDir) -> Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>> {
|
||||
let hot_path = db_path.path().join("hot_db");
|
||||
let cold_path = db_path.path().join("cold_db");
|
||||
let config = StoreConfig::default();
|
||||
let log = NullLoggerBuilder.build().expect("logger should build");
|
||||
|
||||
let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter);
|
||||
let drain = slog_term::FullFormat::new(decorator).build();
|
||||
let log = slog::Logger::root(std::sync::Mutex::new(drain).fuse(), o!());
|
||||
|
||||
Arc::new(
|
||||
HotColdDB::open(&hot_path, &cold_path, config, spec, log)
|
||||
.expect("disk store should initialize"),
|
||||
|
@ -1,7 +1,7 @@
|
||||
use crate::chunked_vector::ChunkError;
|
||||
use crate::hot_cold_store::HotColdDBError;
|
||||
use ssz::DecodeError;
|
||||
use types::{BeaconStateError, Hash256};
|
||||
use types::{BeaconStateError, Hash256, Slot};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@ -16,6 +16,7 @@ pub enum Error {
|
||||
RlpError(String),
|
||||
BlockNotFound(Hash256),
|
||||
NoContinuationData,
|
||||
SplitPointModified(Slot, Slot),
|
||||
}
|
||||
|
||||
impl From<DecodeError> for Error {
|
||||
|
@ -14,7 +14,7 @@ use crate::{
|
||||
};
|
||||
use lru::LruCache;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use slog::{debug, trace, warn, Logger};
|
||||
use slog::{debug, error, trace, warn, Logger};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use state_processing::{
|
||||
@ -676,13 +676,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
Ok(split)
|
||||
}
|
||||
|
||||
/// Store the split point on disk.
|
||||
fn store_split(&self) -> Result<(), Error> {
|
||||
let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes());
|
||||
self.hot_db.put(&key, &*self.split.read())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load the state root of a restore point.
|
||||
fn load_restore_point_hash(&self, restore_point_index: u64) -> Result<Hash256, Error> {
|
||||
let key = Self::restore_point_key(restore_point_index);
|
||||
@ -717,13 +710,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.map(|s: ColdStateSummary| s.slot))
|
||||
}
|
||||
|
||||
/// Store the slot of a frozen state.
|
||||
fn store_cold_state_slot(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
|
||||
self.cold_db
|
||||
.put(state_root, &ColdStateSummary { slot })
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Load a hot state's summary, given its root.
|
||||
pub fn load_hot_state_summary(
|
||||
&self,
|
||||
@ -778,7 +764,7 @@ pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
// 0. Check that the migration is sensible.
|
||||
// The new frozen head must increase the current split slot, and lie on an epoch
|
||||
// boundary (in order for the hot state summary scheme to work).
|
||||
let current_split_slot = store.get_split_slot();
|
||||
let current_split_slot = store.split.read().slot;
|
||||
|
||||
if frozen_head.slot < current_split_slot {
|
||||
return Err(HotColdDBError::FreezeSlotError {
|
||||
@ -792,45 +778,93 @@ pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot).into());
|
||||
}
|
||||
|
||||
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
|
||||
|
||||
// 1. Copy all of the states between the head and the split slot, from the hot DB
|
||||
// to the cold DB.
|
||||
let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head);
|
||||
|
||||
let mut to_delete = vec![];
|
||||
for maybe_pair in state_root_iter.take_while(|result| match result {
|
||||
Ok((_, slot)) => slot >= ¤t_split_slot,
|
||||
Err(_) => true,
|
||||
}) {
|
||||
let (state_root, slot) = maybe_pair?;
|
||||
|
||||
let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
|
||||
|
||||
if slot % store.config.slots_per_restore_point == 0 {
|
||||
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root)?
|
||||
.ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?;
|
||||
|
||||
let mut ops: Vec<KeyValueStoreOp> = Vec::new();
|
||||
store.store_cold_state(&state_root, &state, &mut ops)?;
|
||||
store.cold_db.do_atomically(ops)?;
|
||||
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
|
||||
}
|
||||
|
||||
// Store a pointer from this state root to its slot, so we can later reconstruct states
|
||||
// from their state root alone.
|
||||
store.store_cold_state_slot(&state_root, slot)?;
|
||||
let cold_state_summary = ColdStateSummary { slot };
|
||||
let op = cold_state_summary.as_kv_store_op(state_root);
|
||||
cold_db_ops.push(op);
|
||||
|
||||
// There are data dependencies between calls to `store_cold_state()` that prevent us from
|
||||
// doing one big call to `store.cold_db.do_atomically()` at end of the loop.
|
||||
store.cold_db.do_atomically(cold_db_ops)?;
|
||||
|
||||
// Delete the old summary, and the full state if we lie on an epoch boundary.
|
||||
to_delete.push((state_root, slot));
|
||||
hot_db_ops.push(StoreOp::DeleteState(state_root.into(), slot));
|
||||
}
|
||||
|
||||
// 2. Update the split slot
|
||||
*store.split.write() = Split {
|
||||
slot: frozen_head.slot,
|
||||
state_root: frozen_head_root,
|
||||
};
|
||||
store.store_split()?;
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
// inconsistent state if the OS process dies at any point during the freezeing
|
||||
// procedure.
|
||||
//
|
||||
// Since it is pretty much impossible to be atomic across more than one database, we trade
|
||||
// losing track of states to delete, for consistency. In other words: We should be safe to die
|
||||
// at any point below but it may happen that some states won't be deleted from the hot database
|
||||
// and will remain there forever. Since dying in these particular few lines should be an
|
||||
// exceedingly rare event, this should be an acceptable tradeoff.
|
||||
|
||||
// 3. Delete from the hot DB
|
||||
for (state_root, slot) in to_delete {
|
||||
store.delete_state(&state_root, slot)?;
|
||||
// Flush to disk all the states that have just been migrated to the cold store.
|
||||
store.cold_db.sync()?;
|
||||
|
||||
{
|
||||
let mut split_guard = store.split.write();
|
||||
let latest_split_slot = split_guard.slot;
|
||||
|
||||
// Detect a sitation where the split point is (erroneously) changed from more than one
|
||||
// place in code.
|
||||
if latest_split_slot != current_split_slot {
|
||||
error!(
|
||||
store.log,
|
||||
"Race condition detected: Split point changed while moving states to the freezer";
|
||||
"previous split slot" => current_split_slot,
|
||||
"current split slot" => latest_split_slot,
|
||||
);
|
||||
|
||||
// Assume the freezing procedure will be retried in case this happens.
|
||||
return Err(Error::SplitPointModified(
|
||||
current_split_slot,
|
||||
latest_split_slot,
|
||||
));
|
||||
}
|
||||
|
||||
// Before updating the in-memory split value, we flush it to disk first, so that should the
|
||||
// OS process die at this point, we pick up from the right place after a restart.
|
||||
let split = Split {
|
||||
slot: frozen_head.slot,
|
||||
state_root: frozen_head_root,
|
||||
};
|
||||
store
|
||||
.hot_db
|
||||
.put_sync(&Hash256::from_slice(SPLIT_DB_KEY.as_bytes()), &split)?;
|
||||
|
||||
// Split point is now persisted in the hot database on disk. The in-memory split point
|
||||
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
|
||||
// the in-memory split point now.
|
||||
*split_guard = split;
|
||||
}
|
||||
|
||||
// Delete the states from the hot database if we got this far.
|
||||
store.do_atomically(hot_db_ops)?;
|
||||
|
||||
debug!(
|
||||
store.log,
|
||||
"Freezer migration complete";
|
||||
@ -842,7 +876,7 @@ pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
|
||||
/// Struct for storing the split slot and state root in the database.
|
||||
#[derive(Debug, Clone, Copy, Default, Encode, Decode)]
|
||||
struct Split {
|
||||
pub struct Split {
|
||||
slot: Slot,
|
||||
state_root: Hash256,
|
||||
}
|
||||
|
@ -37,9 +37,49 @@ impl<E: EthSpec> LevelDB<E> {
|
||||
fn write_options(&self) -> WriteOptions {
|
||||
WriteOptions::new()
|
||||
}
|
||||
|
||||
fn write_options_sync(&self) -> WriteOptions {
|
||||
let mut opts = WriteOptions::new();
|
||||
opts.sync = true;
|
||||
opts
|
||||
}
|
||||
|
||||
fn put_bytes_with_options(
|
||||
&self,
|
||||
col: &str,
|
||||
key: &[u8],
|
||||
val: &[u8],
|
||||
opts: WriteOptions,
|
||||
) -> Result<(), Error> {
|
||||
let column_key = get_key_for_col(col, key);
|
||||
|
||||
metrics::inc_counter(&metrics::DISK_DB_WRITE_COUNT);
|
||||
metrics::inc_counter_by(&metrics::DISK_DB_WRITE_BYTES, val.len() as i64);
|
||||
let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES);
|
||||
|
||||
self.db
|
||||
.put(opts, BytesKey::from_vec(column_key), val)
|
||||
.map_err(Into::into)
|
||||
.map(|()| {
|
||||
metrics::stop_timer(timer);
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
/// Store some `value` in `column`, indexed with `key`.
|
||||
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
self.put_bytes_with_options(col, key, val, self.write_options())
|
||||
}
|
||||
|
||||
fn put_bytes_sync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
self.put_bytes_with_options(col, key, val, self.write_options_sync())
|
||||
}
|
||||
|
||||
fn sync(&self) -> Result<(), Error> {
|
||||
self.put_bytes_sync("sync", "sync".as_bytes(), "sync".as_bytes())
|
||||
}
|
||||
|
||||
/// Retrieve some bytes in `column` with `key`.
|
||||
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||
let column_key = get_key_for_col(col, key);
|
||||
@ -59,22 +99,6 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Store some `value` in `column`, indexed with `key`.
|
||||
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
let column_key = get_key_for_col(col, key);
|
||||
|
||||
metrics::inc_counter(&metrics::DISK_DB_WRITE_COUNT);
|
||||
metrics::inc_counter_by(&metrics::DISK_DB_WRITE_BYTES, val.len() as i64);
|
||||
let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES);
|
||||
|
||||
self.db
|
||||
.put(self.write_options(), BytesKey::from_vec(column_key), val)
|
||||
.map_err(Into::into)
|
||||
.map(|()| {
|
||||
metrics::stop_timer(timer);
|
||||
})
|
||||
}
|
||||
|
||||
/// Return `true` if `key` exists in `column`.
|
||||
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
|
||||
let column_key = get_key_for_col(col, key);
|
||||
|
@ -27,7 +27,7 @@ pub mod iter;
|
||||
use std::borrow::Cow;
|
||||
|
||||
pub use self::config::StoreConfig;
|
||||
pub use self::hot_cold_store::{HotColdDB, HotStateSummary};
|
||||
pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split};
|
||||
pub use self::leveldb_store::LevelDB;
|
||||
pub use self::memory_store::MemoryStore;
|
||||
pub use self::partial_beacon_state::PartialBeaconState;
|
||||
@ -43,6 +43,14 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
/// Store some `value` in `column`, indexed with `key`.
|
||||
fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>;
|
||||
|
||||
/// Same as put_bytes() but also force a flush to disk
|
||||
fn put_bytes_sync(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>;
|
||||
|
||||
/// Flush to disk. See
|
||||
/// https://chromium.googlesource.com/external/leveldb/+/HEAD/doc/index.md#synchronous-writes
|
||||
/// for details.
|
||||
fn sync(&self) -> Result<(), Error>;
|
||||
|
||||
/// Return `true` if `key` exists in `column`.
|
||||
fn key_exists(&self, column: &str, key: &[u8]) -> Result<bool, Error>;
|
||||
|
||||
@ -74,6 +82,14 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn put_sync<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
|
||||
let column = I::db_column().into();
|
||||
let key = key.as_bytes();
|
||||
|
||||
self.put_bytes_sync(column, key, &item.as_store_bytes())
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Retrieve an item from `Self`.
|
||||
fn get<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
|
||||
let column = I::db_column().into();
|
||||
|
@ -51,6 +51,15 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_bytes_sync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
self.put_bytes(col, key, val)
|
||||
}
|
||||
|
||||
fn sync(&self) -> Result<(), Error> {
|
||||
// no-op
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return true if some key exists in some column.
|
||||
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
|
||||
let column_key = Self::get_key_for_col(col, key);
|
||||
|
Loading…
Reference in New Issue
Block a user