From 2f456ff9eb15d9767564c812382d701c6cf9f5d6 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 13 Feb 2023 03:32:01 +0000 Subject: [PATCH] Fix regression in DB write atomicity (#3931) ## Issue Addressed Fix a bug introduced by #3696. The bug is not expected to occur frequently, so releasing this PR is non-urgent. ## Proposed Changes * Add a variant to `StoreOp` that allows a raw KV operation to be passed around. * Return to using `self.store.do_atomically` rather than `self.store.hot_db.do_atomically`. This streamlines the write back into a single call and makes our auto-revert work again. * Prevent `import_block_update_shuffling_cache` from failing block import. This is an outstanding bug from before v3.4.0 which may have contributed to some random unexplained database corruption. ## Additional Info In #3696 I split the database write into two calls, one to convert the `StoreOp`s to `KeyValueStoreOp`s and one to write them. This had the unfortunate side-effect of damaging our atomicity guarantees in case of a write error. If the first call failed, we would be left with the block in fork choice but not on-disk (or the snapshot cache), which would prevent us from processing any descendant blocks. On `unstable` the first call is very unlikely to fail unless the disk is full, but on `tree-states` the conversion is more involved and a user reported database corruption after it failed in a way that should have been recoverable. Additionally, as @emhane observed, #3696 also inadvertently removed the import of the new block into the block cache. Although this seems like it could have negatively impacted performance, there are several mitigating factors: - For regular block processing we should almost always load the parent block (and state) from the snapshot cache. - We often load blinded blocks, which bypass the block cache anyway. - Metrics show no noticeable increase in the block cache miss rate with v3.4.0. However, I expect the block cache _will_ be useful again in `tree-states`, so it is restored to use by this PR. --- beacon_node/beacon_chain/src/beacon_chain.rs | 40 ++++++++++++++----- .../src/validator_pubkey_cache.rs | 17 +++++--- beacon_node/store/src/hot_cold_store.rs | 6 +++ beacon_node/store/src/lib.rs | 1 + 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6a67ae71e..4ec13f8f5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2714,7 +2714,7 @@ impl BeaconChain { // is so we don't have to think about lock ordering with respect to the fork choice lock. // There are a bunch of places where we lock both fork choice and the pubkey cache and it // would be difficult to check that they all lock fork choice first. - let mut kv_store_ops = self + let mut ops = self .validator_pubkey_cache .try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) .ok_or(Error::ValidatorPubkeyCacheLockTimeout)? @@ -2816,9 +2816,14 @@ impl BeaconChain { // ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- // Most blocks are now capable of being attested to thanks to the `early_attester_cache` // cache above. Resume non-essential processing. + // + // It is important NOT to return errors here before the database commit, because the block + // has already been added to fork choice and the database would be left in an inconsistent + // state if we returned early without committing. In other words, an error here would + // corrupt the node's database permanently. // ----------------------------------------------------------------------------------------- - self.import_block_update_shuffling_cache(block_root, &mut state)?; + self.import_block_update_shuffling_cache(block_root, &mut state); self.import_block_observe_attestations( block, &state, @@ -2841,17 +2846,16 @@ impl BeaconChain { // If the write fails, revert fork choice to the version from disk, else we can // end up with blocks in fork choice that are missing from disk. // See https://github.com/sigp/lighthouse/issues/2028 - let mut ops: Vec<_> = confirmed_state_roots - .into_iter() - .map(StoreOp::DeleteStateTemporaryFlag) - .collect(); + ops.extend( + confirmed_state_roots + .into_iter() + .map(StoreOp::DeleteStateTemporaryFlag), + ); ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); let txn_lock = self.store.hot_db.begin_rw_transaction(); - kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?); - - if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) { + if let Err(e) = self.store.do_atomically(ops) { error!( self.log, "Database write failed!"; @@ -3280,13 +3284,27 @@ impl BeaconChain { } } + // For the current and next epoch of this state, ensure we have the shuffling from this + // block in our cache. fn import_block_update_shuffling_cache( &self, block_root: Hash256, state: &mut BeaconState, + ) { + if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) { + warn!( + self.log, + "Failed to prime shuffling cache"; + "error" => ?e + ); + } + } + + fn import_block_update_shuffling_cache_fallible( + &self, + block_root: Hash256, + state: &mut BeaconState, ) -> Result<(), BlockError> { - // For the current and next epoch of this state, ensure we have the shuffling from this - // block in our cache. for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 26aea2d27..79910df29 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use std::collections::HashMap; use std::convert::TryInto; use std::marker::PhantomData; -use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem}; +use store::{DBColumn, Error as StoreError, StoreItem, StoreOp}; use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; /// Provides a mapping of `validator_index -> validator_publickey`. @@ -38,7 +38,7 @@ impl ValidatorPubkeyCache { }; let store_ops = cache.import_new_pubkeys(state)?; - store.hot_db.do_atomically(store_ops)?; + store.do_atomically(store_ops)?; Ok(cache) } @@ -79,7 +79,7 @@ impl ValidatorPubkeyCache { pub fn import_new_pubkeys( &mut self, state: &BeaconState, - ) -> Result, BeaconChainError> { + ) -> Result>, BeaconChainError> { if state.validators().len() > self.pubkeys.len() { self.import( state.validators()[self.pubkeys.len()..] @@ -92,7 +92,10 @@ impl ValidatorPubkeyCache { } /// Adds zero or more validators to `self`. - fn import(&mut self, validator_keys: I) -> Result, BeaconChainError> + fn import( + &mut self, + validator_keys: I, + ) -> Result>, BeaconChainError> where I: Iterator + ExactSizeIterator, { @@ -112,7 +115,9 @@ impl ValidatorPubkeyCache { // It will be committed atomically when the block that introduced it is written to disk. // Notably it is NOT written while the write lock on the cache is held. // See: https://github.com/sigp/lighthouse/issues/2327 - store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i))); + store_ops.push(StoreOp::KeyValueOp( + DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)), + )); self.pubkeys.push( (&pubkey) @@ -294,7 +299,7 @@ mod test { let ops = cache .import_new_pubkeys(&state) .expect("should import pubkeys"); - store.hot_db.do_atomically(ops).unwrap(); + store.do_atomically(ops).unwrap(); check_cache_get(&cache, &keypairs[..]); drop(cache); diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 4f63f4e7f..6028d0ddc 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -727,6 +727,10 @@ impl, Cold: ItemStore> HotColdDB let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } + + StoreOp::KeyValueOp(kv_op) => { + key_value_batch.push(kv_op); + } } } Ok(key_value_batch) @@ -758,6 +762,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteState(_, _) => (), StoreOp::DeleteExecutionPayload(_) => (), + + StoreOp::KeyValueOp(_) => (), } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 75aeca058..9d15dd404 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -161,6 +161,7 @@ pub enum StoreOp<'a, E: EthSpec> { DeleteBlock(Hash256), DeleteState(Hash256, Option), DeleteExecutionPayload(Hash256), + KeyValueOp(KeyValueStoreOp), } /// A unique column identifier.