Fix I/O atomicity issues with checkpoint sync (#2671)
## Issue Addressed This PR addresses an issue found by @YorickDowne during testing of v2.0.0-rc.0. Due to a lack of atomic database writes on checkpoint sync start-up, it was possible for the database to get into an inconsistent state from which it couldn't recover without `--purge-db`. The core of the issue was that the store's anchor info was being stored _before_ the `PersistedBeaconChain`. If a crash occured so that anchor info was stored but _not_ the `PersistedBeaconChain`, then on restart Lighthouse would think the database was unitialized and attempt to compare-and-swap a `None` value, but would actually find the stale info from the previous run. ## Proposed Changes The issue is fixed by writing the anchor info, the split point, and the `PersistedBeaconChain` atomically on start-up. Some type-hinting ugliness was required, which could possibly be cleaned up in future refactors.
This commit is contained in:
parent
28b79084cd
commit
ed1fc7cca6
@ -339,24 +339,41 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a `PersistedBeaconChain` representing the current head.
|
/// Return a `PersistedBeaconChain` without reference to a `BeaconChain`.
|
||||||
pub fn make_persisted_head(&self) -> PersistedBeaconChain {
|
pub fn make_persisted_head(
|
||||||
|
genesis_block_root: Hash256,
|
||||||
|
head_tracker: &HeadTracker,
|
||||||
|
) -> PersistedBeaconChain {
|
||||||
PersistedBeaconChain {
|
PersistedBeaconChain {
|
||||||
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
|
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
|
||||||
genesis_block_root: self.genesis_block_root,
|
genesis_block_root,
|
||||||
ssz_head_tracker: self.head_tracker.to_ssz_container(),
|
ssz_head_tracker: head_tracker.to_ssz_container(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a database operation for writing the beacon chain head to disk.
|
/// Return a database operation for writing the beacon chain head to disk.
|
||||||
pub fn persist_head_in_batch(&self) -> KeyValueStoreOp {
|
pub fn persist_head_in_batch(&self) -> KeyValueStoreOp {
|
||||||
self.make_persisted_head()
|
Self::persist_head_in_batch_standalone(self.genesis_block_root, &self.head_tracker)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn persist_head_in_batch_standalone(
|
||||||
|
genesis_block_root: Hash256,
|
||||||
|
head_tracker: &HeadTracker,
|
||||||
|
) -> KeyValueStoreOp {
|
||||||
|
Self::make_persisted_head(genesis_block_root, head_tracker)
|
||||||
.as_kv_store_op(BEACON_CHAIN_DB_KEY)
|
.as_kv_store_op(BEACON_CHAIN_DB_KEY)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a database operation for writing fork choice to disk.
|
/// Return a database operation for writing fork choice to disk.
|
||||||
pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp {
|
pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp {
|
||||||
let fork_choice = self.fork_choice.read();
|
let fork_choice = self.fork_choice.read();
|
||||||
|
Self::persist_fork_choice_in_batch_standalone(&fork_choice)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a database operation for writing fork choice to disk.
|
||||||
|
pub fn persist_fork_choice_in_batch_standalone(
|
||||||
|
fork_choice: &BeaconForkChoice<T>,
|
||||||
|
) -> KeyValueStoreOp {
|
||||||
let persisted_fork_choice = PersistedForkChoice {
|
let persisted_fork_choice = PersistedForkChoice {
|
||||||
fork_choice: fork_choice.to_persisted(),
|
fork_choice: fork_choice.to_persisted(),
|
||||||
fork_choice_store: fork_choice.fc_store().to_persisted(),
|
fork_choice_store: fork_choice.fc_store().to_persisted(),
|
||||||
|
@ -25,7 +25,7 @@ use slot_clock::{SlotClock, TestingSlotClock};
|
|||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use store::{Error as StoreError, HotColdDB, ItemStore};
|
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
|
||||||
use task_executor::ShutdownReason;
|
use task_executor::ShutdownReason;
|
||||||
use types::{
|
use types::{
|
||||||
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
|
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
|
||||||
@ -87,6 +87,9 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
|
|||||||
graffiti: Graffiti,
|
graffiti: Graffiti,
|
||||||
slasher: Option<Arc<Slasher<T::EthSpec>>>,
|
slasher: Option<Arc<Slasher<T::EthSpec>>>,
|
||||||
validator_monitor: Option<ValidatorMonitor<T::EthSpec>>,
|
validator_monitor: Option<ValidatorMonitor<T::EthSpec>>,
|
||||||
|
// Pending I/O batch that is constructed during building and should be executed atomically
|
||||||
|
// alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called.
|
||||||
|
pending_io_batch: Vec<KeyValueStoreOp>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
|
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
|
||||||
@ -124,6 +127,7 @@ where
|
|||||||
graffiti: Graffiti::default(),
|
graffiti: Graffiti::default(),
|
||||||
slasher: None,
|
slasher: None,
|
||||||
validator_monitor: None,
|
validator_monitor: None,
|
||||||
|
pending_io_batch: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -416,13 +420,11 @@ where
|
|||||||
// Set the store's split point *before* storing genesis so that genesis is stored
|
// Set the store's split point *before* storing genesis so that genesis is stored
|
||||||
// immediately in the freezer DB.
|
// immediately in the freezer DB.
|
||||||
store.set_split(weak_subj_slot, weak_subj_state_root);
|
store.set_split(weak_subj_slot, weak_subj_state_root);
|
||||||
store
|
|
||||||
.store_split()
|
|
||||||
.map_err(|e| format!("Error storing DB split point: {:?}", e))?;
|
|
||||||
|
|
||||||
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
|
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
|
||||||
self = updated_builder;
|
self = updated_builder;
|
||||||
|
|
||||||
|
// Write the state and block non-atomically, it doesn't matter if they're forgotten
|
||||||
|
// about on a crash restart.
|
||||||
store
|
store
|
||||||
.put_state(&weak_subj_state_root, &weak_subj_state)
|
.put_state(&weak_subj_state_root, &weak_subj_state)
|
||||||
.map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?;
|
.map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?;
|
||||||
@ -430,18 +432,22 @@ where
|
|||||||
.put_block(&weak_subj_block_root, weak_subj_block.clone())
|
.put_block(&weak_subj_block_root, weak_subj_block.clone())
|
||||||
.map_err(|e| format!("Failed to store weak subjectivity block: {:?}", e))?;
|
.map_err(|e| format!("Failed to store weak subjectivity block: {:?}", e))?;
|
||||||
|
|
||||||
// Store anchor info (context for weak subj sync).
|
// Stage the database's metadata fields for atomic storage when `build` is called.
|
||||||
store
|
// This prevents the database from restarting in an inconsistent state if the anchor
|
||||||
.init_anchor_info(weak_subj_block.message())
|
// info or split point is written before the `PersistedBeaconChain`.
|
||||||
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?;
|
self.pending_io_batch.push(store.store_split_in_batch());
|
||||||
|
self.pending_io_batch.push(
|
||||||
|
store
|
||||||
|
.init_anchor_info(weak_subj_block.message())
|
||||||
|
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
|
||||||
|
);
|
||||||
|
|
||||||
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
|
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
|
||||||
store
|
self.pending_io_batch
|
||||||
.store_pruning_checkpoint(Checkpoint {
|
.push(store.pruning_checkpoint_store_op(Checkpoint {
|
||||||
root: weak_subj_block_root,
|
root: weak_subj_block_root,
|
||||||
epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()),
|
epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()),
|
||||||
})
|
}));
|
||||||
.map_err(|e| format!("Failed to write pruning checkpoint: {:?}", e))?;
|
|
||||||
|
|
||||||
let snapshot = BeaconSnapshot {
|
let snapshot = BeaconSnapshot {
|
||||||
beacon_block_root: weak_subj_block_root,
|
beacon_block_root: weak_subj_block_root,
|
||||||
@ -542,7 +548,7 @@ where
|
|||||||
/// configured.
|
/// configured.
|
||||||
#[allow(clippy::type_complexity)] // I think there's nothing to be gained here from a type alias.
|
#[allow(clippy::type_complexity)] // I think there's nothing to be gained here from a type alias.
|
||||||
pub fn build(
|
pub fn build(
|
||||||
self,
|
mut self,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
BeaconChain<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>,
|
BeaconChain<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>,
|
||||||
String,
|
String,
|
||||||
@ -679,6 +685,26 @@ where
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Store the `PersistedBeaconChain` in the database atomically with the metadata so that on
|
||||||
|
// restart we can correctly detect the presence of an initialized database.
|
||||||
|
//
|
||||||
|
// This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance
|
||||||
|
// doesn't write a `PersistedBeaconChain` without the rest of the batch.
|
||||||
|
self.pending_io_batch.push(BeaconChain::<
|
||||||
|
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
|
||||||
|
>::persist_head_in_batch_standalone(
|
||||||
|
genesis_block_root, &head_tracker
|
||||||
|
));
|
||||||
|
self.pending_io_batch.push(BeaconChain::<
|
||||||
|
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
|
||||||
|
>::persist_fork_choice_in_batch_standalone(
|
||||||
|
&fork_choice
|
||||||
|
));
|
||||||
|
store
|
||||||
|
.hot_db
|
||||||
|
.do_atomically(self.pending_io_batch)
|
||||||
|
.map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?;
|
||||||
|
|
||||||
let beacon_chain = BeaconChain {
|
let beacon_chain = BeaconChain {
|
||||||
spec: self.spec,
|
spec: self.spec,
|
||||||
config: self.chain_config,
|
config: self.chain_config,
|
||||||
|
@ -182,7 +182,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
};
|
};
|
||||||
let backfill_complete = new_anchor.block_backfill_complete();
|
let backfill_complete = new_anchor.block_backfill_complete();
|
||||||
self.store
|
self.store
|
||||||
.compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?;
|
.compare_and_set_anchor_info_with_write(Some(anchor_info), Some(new_anchor))?;
|
||||||
|
|
||||||
// If backfill has completed and the chain is configured to reconstruct historic states,
|
// If backfill has completed and the chain is configured to reconstruct historic states,
|
||||||
// send a message to the background migrator instructing it to begin reconstruction.
|
// send a message to the background migrator instructing it to begin reconstruction.
|
||||||
|
@ -261,7 +261,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Prepare a signed beacon block for storage in the database.
|
/// Prepare a signed beacon block for storage in the database.
|
||||||
#[must_use]
|
|
||||||
pub fn block_as_kv_store_op(
|
pub fn block_as_kv_store_op(
|
||||||
&self,
|
&self,
|
||||||
key: &Hash256,
|
key: &Hash256,
|
||||||
@ -973,7 +972,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Initialise the anchor info for checkpoint sync starting from `block`.
|
/// Initialise the anchor info for checkpoint sync starting from `block`.
|
||||||
pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result<(), Error> {
|
pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result<KeyValueStoreOp, Error> {
|
||||||
let anchor_slot = block.slot();
|
let anchor_slot = block.slot();
|
||||||
let slots_per_restore_point = self.config.slots_per_restore_point;
|
let slots_per_restore_point = self.config.slots_per_restore_point;
|
||||||
|
|
||||||
@ -1003,23 +1002,36 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
|
|
||||||
/// Atomically update the anchor info from `prev_value` to `new_value`.
|
/// Atomically update the anchor info from `prev_value` to `new_value`.
|
||||||
///
|
///
|
||||||
|
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
|
||||||
|
/// values.
|
||||||
|
///
|
||||||
/// Return an `AnchorInfoConcurrentMutation` error if the `prev_value` provided
|
/// Return an `AnchorInfoConcurrentMutation` error if the `prev_value` provided
|
||||||
/// is not correct.
|
/// is not correct.
|
||||||
pub fn compare_and_set_anchor_info(
|
pub fn compare_and_set_anchor_info(
|
||||||
&self,
|
&self,
|
||||||
prev_value: Option<AnchorInfo>,
|
prev_value: Option<AnchorInfo>,
|
||||||
new_value: Option<AnchorInfo>,
|
new_value: Option<AnchorInfo>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<KeyValueStoreOp, Error> {
|
||||||
let mut anchor_info = self.anchor_info.write();
|
let mut anchor_info = self.anchor_info.write();
|
||||||
if *anchor_info == prev_value {
|
if *anchor_info == prev_value {
|
||||||
self.store_anchor_info(&new_value)?;
|
let kv_op = self.store_anchor_info_in_batch(&new_value);
|
||||||
*anchor_info = new_value;
|
*anchor_info = new_value;
|
||||||
Ok(())
|
Ok(kv_op)
|
||||||
} else {
|
} else {
|
||||||
Err(Error::AnchorInfoConcurrentMutation)
|
Err(Error::AnchorInfoConcurrentMutation)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// As for `compare_and_set_anchor_info`, but also writes the anchor to disk immediately.
|
||||||
|
pub fn compare_and_set_anchor_info_with_write(
|
||||||
|
&self,
|
||||||
|
prev_value: Option<AnchorInfo>,
|
||||||
|
new_value: Option<AnchorInfo>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let kv_store_op = self.compare_and_set_anchor_info(prev_value, new_value)?;
|
||||||
|
self.hot_db.do_atomically(vec![kv_store_op])
|
||||||
|
}
|
||||||
|
|
||||||
/// Load the anchor info from disk, but do not set `self.anchor_info`.
|
/// Load the anchor info from disk, but do not set `self.anchor_info`.
|
||||||
fn load_anchor_info(&self) -> Result<Option<AnchorInfo>, Error> {
|
fn load_anchor_info(&self) -> Result<Option<AnchorInfo>, Error> {
|
||||||
self.hot_db.get(&ANCHOR_INFO_KEY)
|
self.hot_db.get(&ANCHOR_INFO_KEY)
|
||||||
@ -1029,13 +1041,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
///
|
///
|
||||||
/// The argument is intended to be `self.anchor_info`, but is passed manually to avoid issues
|
/// The argument is intended to be `self.anchor_info`, but is passed manually to avoid issues
|
||||||
/// with recursive locking.
|
/// with recursive locking.
|
||||||
fn store_anchor_info(&self, anchor_info: &Option<AnchorInfo>) -> Result<(), Error> {
|
fn store_anchor_info_in_batch(&self, anchor_info: &Option<AnchorInfo>) -> KeyValueStoreOp {
|
||||||
if let Some(ref anchor_info) = anchor_info {
|
if let Some(ref anchor_info) = anchor_info {
|
||||||
self.hot_db.put(&ANCHOR_INFO_KEY, anchor_info)?;
|
anchor_info.as_kv_store_op(ANCHOR_INFO_KEY)
|
||||||
} else {
|
} else {
|
||||||
self.hot_db.delete::<AnchorInfo>(&ANCHOR_INFO_KEY)?;
|
KeyValueStoreOp::DeleteKey(get_key_for_col(
|
||||||
|
DBColumn::BeaconMeta.into(),
|
||||||
|
ANCHOR_INFO_KEY.as_bytes(),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If an anchor exists, return its `anchor_slot` field.
|
/// If an anchor exists, return its `anchor_slot` field.
|
||||||
@ -1103,10 +1117,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
self.hot_db.get(&SPLIT_KEY)
|
self.hot_db.get(&SPLIT_KEY)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Store the split point to disk.
|
/// Stage the split for storage to disk.
|
||||||
pub fn store_split(&self) -> Result<(), Error> {
|
pub fn store_split_in_batch(&self) -> KeyValueStoreOp {
|
||||||
self.hot_db.put_sync(&SPLIT_KEY, &*self.split.read())?;
|
self.split.read_recursive().as_kv_store_op(SPLIT_KEY)
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load the state root of a restore point.
|
/// Load the state root of a restore point.
|
||||||
|
@ -81,6 +81,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
|
|||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub enum KeyValueStoreOp {
|
pub enum KeyValueStoreOp {
|
||||||
PutKeyValue(Vec<u8>, Vec<u8>),
|
PutKeyValue(Vec<u8>, Vec<u8>),
|
||||||
DeleteKey(Vec<u8>),
|
DeleteKey(Vec<u8>),
|
||||||
|
@ -189,7 +189,6 @@ impl<T: EthSpec> PartialBeaconState<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Prepare the partial state for storage in the KV database.
|
/// Prepare the partial state for storage in the KV database.
|
||||||
#[must_use]
|
|
||||||
pub fn as_kv_store_op(&self, state_root: Hash256) -> KeyValueStoreOp {
|
pub fn as_kv_store_op(&self, state_root: Hash256) -> KeyValueStoreOp {
|
||||||
let db_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
|
let db_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
|
||||||
KeyValueStoreOp::PutKeyValue(db_key, self.as_ssz_bytes())
|
KeyValueStoreOp::PutKeyValue(db_key, self.as_ssz_bytes())
|
||||||
|
@ -131,14 +131,17 @@ where
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
self.compare_and_set_anchor_info(old_anchor, None)?;
|
self.compare_and_set_anchor_info_with_write(old_anchor, None)?;
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
} else {
|
} else {
|
||||||
// The lower limit has been raised, store it.
|
// The lower limit has been raised, store it.
|
||||||
anchor.state_lower_limit = slot;
|
anchor.state_lower_limit = slot;
|
||||||
|
|
||||||
self.compare_and_set_anchor_info(old_anchor, Some(anchor.clone()))?;
|
self.compare_and_set_anchor_info_with_write(
|
||||||
|
old_anchor,
|
||||||
|
Some(anchor.clone()),
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user