From c7f47af9fbbd842333b303644be4dccb87fd141b Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Fri, 3 Jul 2020 01:47:31 +0200 Subject: [PATCH] Harden the freezing procedure against failures (#1323) * Enable logging in tests * Migrate states to the freezer atomically --- Cargo.lock | 1 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/tests/store_tests.rs | 12 ++- beacon_node/store/src/errors.rs | 3 +- beacon_node/store/src/hot_cold_store.rs | 100 ++++++++++++------ beacon_node/store/src/leveldb_store.rs | 56 +++++++--- beacon_node/store/src/lib.rs | 18 +++- beacon_node/store/src/memory_store.rs | 9 ++ 8 files changed, 147 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b49fde6c3..085714159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -359,6 +359,7 @@ dependencies = [ "serde_json", "serde_yaml", "slog", + "slog-term", "sloggers", "slot_clock", "smallvec 1.4.0", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 2109f882b..cd826826e 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -25,6 +25,7 @@ serde_derive = "1.0.110" serde_yaml = "0.8.11" serde_json = "1.0.52" slog = { version = "2.5.2", features = ["max_level_trace"] } +slog-term = "2.6.0" sloggers = "1.0.0" slot_clock = { path = "../../common/slot_clock" } eth2_hashing = "0.1.0" diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index c5207b321..6ab2908bc 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3,6 +3,11 @@ #[macro_use] extern crate lazy_static; +#[macro_use] +extern crate slog; +extern crate slog_term; + +use crate::slog::Drain; use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, @@ -10,7 +15,6 @@ use beacon_chain::test_utils::{ use beacon_chain::BeaconSnapshot; use beacon_chain::StateSkipConfig; use rand::Rng; -use sloggers::{null::NullLoggerBuilder, Build}; use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; @@ -40,7 +44,11 @@ fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); let config = StoreConfig::default(); - let log = NullLoggerBuilder.build().expect("logger should build"); + + let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); + let drain = slog_term::FullFormat::new(decorator).build(); + let log = slog::Logger::root(std::sync::Mutex::new(drain).fuse(), o!()); + Arc::new( HotColdDB::open(&hot_path, &cold_path, config, spec, log) .expect("disk store should initialize"), diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 4512ed70d..8e9237361 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -1,7 +1,7 @@ use crate::chunked_vector::ChunkError; use crate::hot_cold_store::HotColdDBError; use ssz::DecodeError; -use types::{BeaconStateError, Hash256}; +use types::{BeaconStateError, Hash256, Slot}; pub type Result = std::result::Result; @@ -16,6 +16,7 @@ pub enum Error { RlpError(String), BlockNotFound(Hash256), NoContinuationData, + SplitPointModified(Slot, Slot), } impl From for Error { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0d96be25e..48917af73 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -14,7 +14,7 @@ use crate::{ }; use lru::LruCache; use parking_lot::{Mutex, RwLock}; -use slog::{debug, trace, warn, Logger}; +use slog::{debug, error, trace, warn, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::{ @@ -676,13 +676,6 @@ impl, Cold: ItemStore> HotColdDB Ok(split) } - /// Store the split point on disk. - fn store_split(&self) -> Result<(), Error> { - let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes()); - self.hot_db.put(&key, &*self.split.read())?; - Ok(()) - } - /// Load the state root of a restore point. fn load_restore_point_hash(&self, restore_point_index: u64) -> Result { let key = Self::restore_point_key(restore_point_index); @@ -717,13 +710,6 @@ impl, Cold: ItemStore> HotColdDB .map(|s: ColdStateSummary| s.slot)) } - /// Store the slot of a frozen state. - fn store_cold_state_slot(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { - self.cold_db - .put(state_root, &ColdStateSummary { slot }) - .map_err(Into::into) - } - /// Load a hot state's summary, given its root. pub fn load_hot_state_summary( &self, @@ -778,7 +764,7 @@ pub fn process_finalization, Cold: ItemStore>( // 0. Check that the migration is sensible. // The new frozen head must increase the current split slot, and lie on an epoch // boundary (in order for the hot state summary scheme to work). - let current_split_slot = store.get_split_slot(); + let current_split_slot = store.split.read().slot; if frozen_head.slot < current_split_slot { return Err(HotColdDBError::FreezeSlotError { @@ -792,45 +778,93 @@ pub fn process_finalization, Cold: ItemStore>( return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot).into()); } + let mut hot_db_ops: Vec> = Vec::new(); + // 1. Copy all of the states between the head and the split slot, from the hot DB // to the cold DB. let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head); - - let mut to_delete = vec![]; for maybe_pair in state_root_iter.take_while(|result| match result { Ok((_, slot)) => slot >= ¤t_split_slot, Err(_) => true, }) { let (state_root, slot) = maybe_pair?; + + let mut cold_db_ops: Vec = Vec::new(); + if slot % store.config.slots_per_restore_point == 0 { let state: BeaconState = get_full_state(&store.hot_db, &state_root)? .ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?; - let mut ops: Vec = Vec::new(); - store.store_cold_state(&state_root, &state, &mut ops)?; - store.cold_db.do_atomically(ops)?; + store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; } // Store a pointer from this state root to its slot, so we can later reconstruct states // from their state root alone. - store.store_cold_state_slot(&state_root, slot)?; + let cold_state_summary = ColdStateSummary { slot }; + let op = cold_state_summary.as_kv_store_op(state_root); + cold_db_ops.push(op); + + // There are data dependencies between calls to `store_cold_state()` that prevent us from + // doing one big call to `store.cold_db.do_atomically()` at end of the loop. + store.cold_db.do_atomically(cold_db_ops)?; // Delete the old summary, and the full state if we lie on an epoch boundary. - to_delete.push((state_root, slot)); + hot_db_ops.push(StoreOp::DeleteState(state_root.into(), slot)); } - // 2. Update the split slot - *store.split.write() = Split { - slot: frozen_head.slot, - state_root: frozen_head_root, - }; - store.store_split()?; + // Warning: Critical section. We have to take care not to put any of the two databases in an + // inconsistent state if the OS process dies at any point during the freezeing + // procedure. + // + // Since it is pretty much impossible to be atomic across more than one database, we trade + // losing track of states to delete, for consistency. In other words: We should be safe to die + // at any point below but it may happen that some states won't be deleted from the hot database + // and will remain there forever. Since dying in these particular few lines should be an + // exceedingly rare event, this should be an acceptable tradeoff. - // 3. Delete from the hot DB - for (state_root, slot) in to_delete { - store.delete_state(&state_root, slot)?; + // Flush to disk all the states that have just been migrated to the cold store. + store.cold_db.sync()?; + + { + let mut split_guard = store.split.write(); + let latest_split_slot = split_guard.slot; + + // Detect a sitation where the split point is (erroneously) changed from more than one + // place in code. + if latest_split_slot != current_split_slot { + error!( + store.log, + "Race condition detected: Split point changed while moving states to the freezer"; + "previous split slot" => current_split_slot, + "current split slot" => latest_split_slot, + ); + + // Assume the freezing procedure will be retried in case this happens. + return Err(Error::SplitPointModified( + current_split_slot, + latest_split_slot, + )); + } + + // Before updating the in-memory split value, we flush it to disk first, so that should the + // OS process die at this point, we pick up from the right place after a restart. + let split = Split { + slot: frozen_head.slot, + state_root: frozen_head_root, + }; + store + .hot_db + .put_sync(&Hash256::from_slice(SPLIT_DB_KEY.as_bytes()), &split)?; + + // Split point is now persisted in the hot database on disk. The in-memory split point + // hasn't been modified elsewhere since we keep a write lock on it. It's safe to update + // the in-memory split point now. + *split_guard = split; } + // Delete the states from the hot database if we got this far. + store.do_atomically(hot_db_ops)?; + debug!( store.log, "Freezer migration complete"; @@ -842,7 +876,7 @@ pub fn process_finalization, Cold: ItemStore>( /// Struct for storing the split slot and state root in the database. #[derive(Debug, Clone, Copy, Default, Encode, Decode)] -struct Split { +pub struct Split { slot: Slot, state_root: Hash256, } diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 4a7822b85..4166c7a1a 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -37,9 +37,49 @@ impl LevelDB { fn write_options(&self) -> WriteOptions { WriteOptions::new() } + + fn write_options_sync(&self) -> WriteOptions { + let mut opts = WriteOptions::new(); + opts.sync = true; + opts + } + + fn put_bytes_with_options( + &self, + col: &str, + key: &[u8], + val: &[u8], + opts: WriteOptions, + ) -> Result<(), Error> { + let column_key = get_key_for_col(col, key); + + metrics::inc_counter(&metrics::DISK_DB_WRITE_COUNT); + metrics::inc_counter_by(&metrics::DISK_DB_WRITE_BYTES, val.len() as i64); + let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); + + self.db + .put(opts, BytesKey::from_vec(column_key), val) + .map_err(Into::into) + .map(|()| { + metrics::stop_timer(timer); + }) + } } impl KeyValueStore for LevelDB { + /// Store some `value` in `column`, indexed with `key`. + fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + self.put_bytes_with_options(col, key, val, self.write_options()) + } + + fn put_bytes_sync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + self.put_bytes_with_options(col, key, val, self.write_options_sync()) + } + + fn sync(&self) -> Result<(), Error> { + self.put_bytes_sync("sync", "sync".as_bytes(), "sync".as_bytes()) + } + /// Retrieve some bytes in `column` with `key`. fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { let column_key = get_key_for_col(col, key); @@ -59,22 +99,6 @@ impl KeyValueStore for LevelDB { }) } - /// Store some `value` in `column`, indexed with `key`. - fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { - let column_key = get_key_for_col(col, key); - - metrics::inc_counter(&metrics::DISK_DB_WRITE_COUNT); - metrics::inc_counter_by(&metrics::DISK_DB_WRITE_BYTES, val.len() as i64); - let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); - - self.db - .put(self.write_options(), BytesKey::from_vec(column_key), val) - .map_err(Into::into) - .map(|()| { - metrics::stop_timer(timer); - }) - } - /// Return `true` if `key` exists in `column`. fn key_exists(&self, col: &str, key: &[u8]) -> Result { let column_key = get_key_for_col(col, key); diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index f9f04b884..02e314dc6 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -27,7 +27,7 @@ pub mod iter; use std::borrow::Cow; pub use self::config::StoreConfig; -pub use self::hot_cold_store::{HotColdDB, HotStateSummary}; +pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split}; pub use self::leveldb_store::LevelDB; pub use self::memory_store::MemoryStore; pub use self::partial_beacon_state::PartialBeaconState; @@ -43,6 +43,14 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Store some `value` in `column`, indexed with `key`. fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>; + /// Same as put_bytes() but also force a flush to disk + fn put_bytes_sync(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>; + + /// Flush to disk. See + /// https://chromium.googlesource.com/external/leveldb/+/HEAD/doc/index.md#synchronous-writes + /// for details. + fn sync(&self) -> Result<(), Error>; + /// Return `true` if `key` exists in `column`. fn key_exists(&self, column: &str, key: &[u8]) -> Result; @@ -74,6 +82,14 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati .map_err(Into::into) } + fn put_sync(&self, key: &Hash256, item: &I) -> Result<(), Error> { + let column = I::db_column().into(); + let key = key.as_bytes(); + + self.put_bytes_sync(column, key, &item.as_store_bytes()) + .map_err(Into::into) + } + /// Retrieve an item from `Self`. fn get(&self, key: &Hash256) -> Result, Error> { let column = I::db_column().into(); diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 30a0b1e0b..19308c86d 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -51,6 +51,15 @@ impl KeyValueStore for MemoryStore { Ok(()) } + fn put_bytes_sync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + self.put_bytes(col, key, val) + } + + fn sync(&self) -> Result<(), Error> { + // no-op + Ok(()) + } + /// Return true if some key exists in some column. fn key_exists(&self, col: &str, key: &[u8]) -> Result { let column_key = Self::get_key_for_col(col, key);