Fix regression in DB write atomicity

This commit is contained in:
Michael Sproul 2023-01-30 17:55:20 +11:00 committed by Emilia Hane
parent 9d919917f5
commit ac4b5b580c
No known key found for this signature in database
GPG Key ID: E73394F9C09206FA
4 changed files with 44 additions and 21 deletions

View File

@ -2878,7 +2878,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// is so we don't have to think about lock ordering with respect to the fork choice lock. // is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it // There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first. // would be difficult to check that they all lock fork choice first.
let mut kv_store_ops = self let mut ops = self
.validator_pubkey_cache .validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) .try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)? .ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
@ -2981,9 +2981,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- // ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
// Most blocks are now capable of being attested to thanks to the `early_attester_cache` // Most blocks are now capable of being attested to thanks to the `early_attester_cache`
// cache above. Resume non-essential processing. // cache above. Resume non-essential processing.
//
// It is important NOT to return errors here before the database commit, because the block
// has already been added to fork choice and the database would be left in an inconsistent
// state if we returned early without committing. In other words, an error here would
// corrupt the node's database permanently.
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
self.import_block_update_shuffling_cache(block_root, &mut state)?; self.import_block_update_shuffling_cache(block_root, &mut state);
self.import_block_observe_attestations( self.import_block_observe_attestations(
block, block,
&state, &state,
@ -3008,10 +3013,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028 // See https://github.com/sigp/lighthouse/issues/2028
let (signed_block, blobs) = signed_block.deconstruct(); let (signed_block, blobs) = signed_block.deconstruct();
let block = signed_block.message(); let block = signed_block.message();
let mut ops: Vec<_> = confirmed_state_roots ops.extend(
.into_iter() confirmed_state_roots
.map(StoreOp::DeleteStateTemporaryFlag) .into_iter()
.collect(); .map(StoreOp::DeleteStateTemporaryFlag),
);
ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state)); ops.push(StoreOp::PutState(block.state_root(), &state));
@ -3036,9 +3042,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let txn_lock = self.store.hot_db.begin_rw_transaction(); let txn_lock = self.store.hot_db.begin_rw_transaction();
kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?); if let Err(e) = self.store.do_atomically(ops) {
if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
error!( error!(
self.log, self.log,
"Database write failed!"; "Database write failed!";
@ -3467,13 +3471,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
} }
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
fn import_block_update_shuffling_cache( fn import_block_update_shuffling_cache(
&self, &self,
block_root: Hash256, block_root: Hash256,
state: &mut BeaconState<T::EthSpec>, state: &mut BeaconState<T::EthSpec>,
) {
if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) {
warn!(
self.log,
"Failed to prime shuffling cache";
"error" => ?e
);
}
}
fn import_block_update_shuffling_cache_fallible(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> { ) -> Result<(), BlockError<T::EthSpec>> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;

View File

@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryInto; use std::convert::TryInto;
use std::marker::PhantomData; use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem}; use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
/// Provides a mapping of `validator_index -> validator_publickey`. /// Provides a mapping of `validator_index -> validator_publickey`.
@ -38,7 +38,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
}; };
let store_ops = cache.import_new_pubkeys(state)?; let store_ops = cache.import_new_pubkeys(state)?;
store.hot_db.do_atomically(store_ops)?; store.do_atomically(store_ops)?;
Ok(cache) Ok(cache)
} }
@ -79,7 +79,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pub fn import_new_pubkeys( pub fn import_new_pubkeys(
&mut self, &mut self,
state: &BeaconState<T::EthSpec>, state: &BeaconState<T::EthSpec>,
) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> { ) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError> {
if state.validators().len() > self.pubkeys.len() { if state.validators().len() > self.pubkeys.len() {
self.import( self.import(
state.validators()[self.pubkeys.len()..] state.validators()[self.pubkeys.len()..]
@ -92,7 +92,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
} }
/// Adds zero or more validators to `self`. /// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> fn import<I>(
&mut self,
validator_keys: I,
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError>
where where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator, I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{ {
@ -112,7 +115,9 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
// It will be committed atomically when the block that introduced it is written to disk. // It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held. // Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327 // See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i))); store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)),
));
self.pubkeys.push( self.pubkeys.push(
(&pubkey) (&pubkey)
@ -294,7 +299,7 @@ mod test {
let ops = cache let ops = cache
.import_new_pubkeys(&state) .import_new_pubkeys(&state)
.expect("should import pubkeys"); .expect("should import pubkeys");
store.hot_db.do_atomically(ops).unwrap(); store.do_atomically(ops).unwrap();
check_cache_get(&cache, &keypairs[..]); check_cache_get(&cache, &keypairs[..]);
drop(cache); drop(cache);

View File

@ -824,8 +824,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
key_value_batch.push(KeyValueStoreOp::PutKeyValue(db_key, [].into())); key_value_batch.push(KeyValueStoreOp::PutKeyValue(db_key, [].into()));
} }
StoreOp::PutRawKVStoreOp(kv_store_op) => { StoreOp::KeyValueOp(kv_op) => {
key_value_batch.push(kv_store_op); key_value_batch.push(kv_op);
} }
} }
} }
@ -870,7 +870,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::PutOrphanedBlobsKey(_) => (), StoreOp::PutOrphanedBlobsKey(_) => (),
StoreOp::PutRawKVStoreOp(_) => (), StoreOp::KeyValueOp(_) => (),
} }
} }

View File

@ -166,7 +166,7 @@ pub enum StoreOp<'a, E: EthSpec> {
DeleteBlobs(Hash256), DeleteBlobs(Hash256),
DeleteState(Hash256, Option<Slot>), DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256), DeleteExecutionPayload(Hash256),
PutRawKVStoreOp(KeyValueStoreOp), KeyValueOp(KeyValueStoreOp),
} }
/// A unique column identifier. /// A unique column identifier.