Hold HeadTracker lock until persisting to disk (#5084)
* Fix head tracker drop order on un-ordered shutdown * lint --------- Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
parent
185646acb2
commit
585124fb2f
@ -30,7 +30,7 @@ use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData
|
|||||||
use crate::events::ServerSentEventHandler;
|
use crate::events::ServerSentEventHandler;
|
||||||
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
|
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
|
||||||
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
|
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
|
||||||
use crate::head_tracker::HeadTracker;
|
use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker};
|
||||||
use crate::historical_blocks::HistoricalBlockError;
|
use crate::historical_blocks::HistoricalBlockError;
|
||||||
use crate::light_client_finality_update_verification::{
|
use crate::light_client_finality_update_verification::{
|
||||||
Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate,
|
Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate,
|
||||||
@ -610,12 +610,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
let mut batch = vec![];
|
let mut batch = vec![];
|
||||||
|
|
||||||
let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD);
|
let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD);
|
||||||
batch.push(self.persist_head_in_batch());
|
|
||||||
|
// Hold a lock to head_tracker until it has been persisted to disk. Otherwise there's a race
|
||||||
|
// condition with the pruning thread which can result in a block present in the head tracker
|
||||||
|
// but absent in the DB. This inconsistency halts pruning and dramastically increases disk
|
||||||
|
// size. Ref: https://github.com/sigp/lighthouse/issues/4773
|
||||||
|
let head_tracker = self.head_tracker.0.read();
|
||||||
|
batch.push(self.persist_head_in_batch(&head_tracker));
|
||||||
|
|
||||||
let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);
|
let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);
|
||||||
batch.push(self.persist_fork_choice_in_batch());
|
batch.push(self.persist_fork_choice_in_batch());
|
||||||
|
|
||||||
self.store.hot_db.do_atomically(batch)?;
|
self.store.hot_db.do_atomically(batch)?;
|
||||||
|
drop(head_tracker);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -623,25 +630,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
/// Return a `PersistedBeaconChain` without reference to a `BeaconChain`.
|
/// Return a `PersistedBeaconChain` without reference to a `BeaconChain`.
|
||||||
pub fn make_persisted_head(
|
pub fn make_persisted_head(
|
||||||
genesis_block_root: Hash256,
|
genesis_block_root: Hash256,
|
||||||
head_tracker: &HeadTracker,
|
head_tracker_reader: &HeadTrackerReader,
|
||||||
) -> PersistedBeaconChain {
|
) -> PersistedBeaconChain {
|
||||||
PersistedBeaconChain {
|
PersistedBeaconChain {
|
||||||
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
|
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
|
||||||
genesis_block_root,
|
genesis_block_root,
|
||||||
ssz_head_tracker: head_tracker.to_ssz_container(),
|
ssz_head_tracker: SszHeadTracker::from_map(head_tracker_reader),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a database operation for writing the beacon chain head to disk.
|
/// Return a database operation for writing the beacon chain head to disk.
|
||||||
pub fn persist_head_in_batch(&self) -> KeyValueStoreOp {
|
pub fn persist_head_in_batch(
|
||||||
Self::persist_head_in_batch_standalone(self.genesis_block_root, &self.head_tracker)
|
&self,
|
||||||
|
head_tracker_reader: &HeadTrackerReader,
|
||||||
|
) -> KeyValueStoreOp {
|
||||||
|
Self::persist_head_in_batch_standalone(self.genesis_block_root, head_tracker_reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn persist_head_in_batch_standalone(
|
pub fn persist_head_in_batch_standalone(
|
||||||
genesis_block_root: Hash256,
|
genesis_block_root: Hash256,
|
||||||
head_tracker: &HeadTracker,
|
head_tracker_reader: &HeadTrackerReader,
|
||||||
) -> KeyValueStoreOp {
|
) -> KeyValueStoreOp {
|
||||||
Self::make_persisted_head(genesis_block_root, head_tracker)
|
Self::make_persisted_head(genesis_block_root, head_tracker_reader)
|
||||||
.as_kv_store_op(BEACON_CHAIN_DB_KEY)
|
.as_kv_store_op(BEACON_CHAIN_DB_KEY)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1341,6 +1351,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
self.head_tracker.heads()
|
self.head_tracker.heads()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Only used in tests.
|
||||||
pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool {
|
pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool {
|
||||||
self.head_tracker.contains_head((*block_hash).into())
|
self.head_tracker.contains_head((*block_hash).into())
|
||||||
}
|
}
|
||||||
|
@ -805,10 +805,11 @@ where
|
|||||||
//
|
//
|
||||||
// This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance
|
// This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance
|
||||||
// doesn't write a `PersistedBeaconChain` without the rest of the batch.
|
// doesn't write a `PersistedBeaconChain` without the rest of the batch.
|
||||||
|
let head_tracker_reader = head_tracker.0.read();
|
||||||
self.pending_io_batch.push(BeaconChain::<
|
self.pending_io_batch.push(BeaconChain::<
|
||||||
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
|
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
|
||||||
>::persist_head_in_batch_standalone(
|
>::persist_head_in_batch_standalone(
|
||||||
genesis_block_root, &head_tracker
|
genesis_block_root, &head_tracker_reader
|
||||||
));
|
));
|
||||||
self.pending_io_batch.push(BeaconChain::<
|
self.pending_io_batch.push(BeaconChain::<
|
||||||
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
|
Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>,
|
||||||
@ -819,6 +820,7 @@ where
|
|||||||
.hot_db
|
.hot_db
|
||||||
.do_atomically(self.pending_io_batch)
|
.do_atomically(self.pending_io_batch)
|
||||||
.map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?;
|
.map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?;
|
||||||
|
drop(head_tracker_reader);
|
||||||
|
|
||||||
let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root();
|
let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root();
|
||||||
let genesis_time = head_snapshot.beacon_state.genesis_time();
|
let genesis_time = head_snapshot.beacon_state.genesis_time();
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use parking_lot::RwLock;
|
use parking_lot::{RwLock, RwLockReadGuard};
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use types::{Hash256, Slot};
|
use types::{Hash256, Slot};
|
||||||
@ -16,6 +16,8 @@ pub enum Error {
|
|||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct HeadTracker(pub RwLock<HashMap<Hash256, Slot>>);
|
pub struct HeadTracker(pub RwLock<HashMap<Hash256, Slot>>);
|
||||||
|
|
||||||
|
pub type HeadTrackerReader<'a> = RwLockReadGuard<'a, HashMap<Hash256, Slot>>;
|
||||||
|
|
||||||
impl HeadTracker {
|
impl HeadTracker {
|
||||||
/// Register a block with `Self`, so it may or may not be included in a `Self::heads` call.
|
/// Register a block with `Self`, so it may or may not be included in a `Self::heads` call.
|
||||||
///
|
///
|
||||||
@ -44,6 +46,11 @@ impl HeadTracker {
|
|||||||
|
|
||||||
/// Returns a `SszHeadTracker`, which contains all necessary information to restore the state
|
/// Returns a `SszHeadTracker`, which contains all necessary information to restore the state
|
||||||
/// of `Self` at some later point.
|
/// of `Self` at some later point.
|
||||||
|
///
|
||||||
|
/// Should ONLY be used for tests, due to the potential for database races.
|
||||||
|
///
|
||||||
|
/// See <https://github.com/sigp/lighthouse/issues/4773>
|
||||||
|
#[cfg(test)]
|
||||||
pub fn to_ssz_container(&self) -> SszHeadTracker {
|
pub fn to_ssz_container(&self) -> SszHeadTracker {
|
||||||
SszHeadTracker::from_map(&self.0.read())
|
SszHeadTracker::from_map(&self.0.read())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user