Clone state ahead of block production (#4925)

* Clone state ahead of block production

* Add pruning and fix logging

* Don't hold 2 states in mem
This commit is contained in:
Michael Sproul 2023-11-30 13:49:35 +11:00 committed by GitHub
parent 43d98153d6
commit 547ed1de63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 21 deletions

View File

@ -482,6 +482,11 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>, pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
/// The KZG trusted setup used by this chain. /// The KZG trusted setup used by this chain.
pub kzg: Option<Arc<Kzg>>, pub kzg: Option<Arc<Kzg>>,
/// State with complete tree hash cache, ready for block production.
///
/// NB: We can delete this once we have tree-states.
#[allow(clippy::type_complexity)]
pub block_production_state: Arc<Mutex<Option<(Hash256, BlockProductionPreState<T::EthSpec>)>>>,
} }
pub enum BeaconBlockResponseType<T: EthSpec> { pub enum BeaconBlockResponseType<T: EthSpec> {
@ -4030,7 +4035,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
); );
(re_org_state.pre_state, re_org_state.state_root) (re_org_state.pre_state, re_org_state.state_root)
} }
// Normal case: proposing a block atop the current head. Use the snapshot cache. // Normal case: proposing a block atop the current head using the cache.
else if let Some((_, cached_state)) = self
.block_production_state
.lock()
.take()
.filter(|(cached_block_root, _)| *cached_block_root == head_block_root)
{
(cached_state.pre_state, cached_state.state_root)
}
// Fall back to a direct read of the snapshot cache.
else if let Some(pre_state) = self else if let Some(pre_state) = self
.snapshot_cache .snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
@ -4038,6 +4052,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
snapshot_cache.get_state_for_block_production(head_block_root) snapshot_cache.get_state_for_block_production(head_block_root)
}) })
{ {
warn!(
self.log,
"Block production cache miss";
"message" => "falling back to snapshot cache clone",
"slot" => slot
);
(pre_state.pre_state, pre_state.state_root) (pre_state.pre_state, pre_state.state_root)
} else { } else {
warn!( warn!(
@ -4161,12 +4181,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
drop(proposer_head_timer); drop(proposer_head_timer);
let re_org_parent_block = proposer_head.parent_node.root; let re_org_parent_block = proposer_head.parent_node.root;
// Only attempt a re-org if we hit the snapshot cache. // Only attempt a re-org if we hit the block production cache or snapshot cache.
let pre_state = self let pre_state = self
.snapshot_cache .block_production_state
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .lock()
.and_then(|snapshot_cache| { .take()
snapshot_cache.get_state_for_block_production(re_org_parent_block) .and_then(|(cached_block_root, state)| {
(cached_block_root == re_org_parent_block).then_some(state)
})
.or_else(|| {
warn!(
self.log,
"Block production cache miss";
"message" => "falling back to snapshot cache during re-org",
"slot" => slot,
"block_root" => ?re_org_parent_block
);
self.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(re_org_parent_block)
})
}) })
.or_else(|| { .or_else(|| {
debug!( debug!(
@ -5326,15 +5361,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ///
/// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the /// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the
/// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`). /// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`).
///
/// Return `Ok(Some(head_block_root))` if this node prepared to propose at the next slot on
/// top of `head_block_root`.
pub async fn prepare_beacon_proposer( pub async fn prepare_beacon_proposer(
self: &Arc<Self>, self: &Arc<Self>,
current_slot: Slot, current_slot: Slot,
) -> Result<(), Error> { ) -> Result<Option<Hash256>, Error> {
let prepare_slot = current_slot + 1; let prepare_slot = current_slot + 1;
// There's no need to run the proposer preparation routine before the bellatrix fork. // There's no need to run the proposer preparation routine before the bellatrix fork.
if self.slot_is_prior_to_bellatrix(prepare_slot) { if self.slot_is_prior_to_bellatrix(prepare_slot) {
return Ok(()); return Ok(None);
} }
let execution_layer = self let execution_layer = self
@ -5347,7 +5385,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !self.config.always_prepare_payload if !self.config.always_prepare_payload
&& !execution_layer.has_any_proposer_preparation_data().await && !execution_layer.has_any_proposer_preparation_data().await
{ {
return Ok(()); return Ok(None);
} }
// Load the cached head and its forkchoice update parameters. // Load the cached head and its forkchoice update parameters.
@ -5394,7 +5432,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let Some((forkchoice_update_params, Some(pre_payload_attributes))) = maybe_prep_data else { let Some((forkchoice_update_params, Some(pre_payload_attributes))) = maybe_prep_data else {
// Appropriate log messages have already been logged above and in // Appropriate log messages have already been logged above and in
// `get_pre_payload_attributes`. // `get_pre_payload_attributes`.
return Ok(()); return Ok(None);
}; };
// If the execution layer doesn't have any proposer data for this validator then we assume // If the execution layer doesn't have any proposer data for this validator then we assume
@ -5405,7 +5443,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.has_proposer_preparation_data(proposer) .has_proposer_preparation_data(proposer)
.await .await
{ {
return Ok(()); return Ok(None);
} }
// Fetch payload attributes from the execution layer's cache, or compute them from scratch // Fetch payload attributes from the execution layer's cache, or compute them from scratch
@ -5500,7 +5538,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"prepare_slot" => prepare_slot, "prepare_slot" => prepare_slot,
"validator" => proposer, "validator" => proposer,
); );
return Ok(()); return Ok(None);
}; };
// If we are close enough to the proposal slot, send an fcU, which will have payload // If we are close enough to the proposal slot, send an fcU, which will have payload
@ -5523,7 +5561,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await?; .await?;
} }
Ok(()) Ok(Some(head_root))
} }
pub async fn update_execution_engine_forkchoice( pub async fn update_execution_engine_forkchoice(

View File

@ -925,6 +925,7 @@ where
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
), ),
kzg, kzg,
block_production_state: Arc::new(Mutex::new(None)),
}; };
let head = beacon_chain.head_snapshot(); let head = beacon_chain.head_snapshot();

View File

@ -45,6 +45,9 @@ const MAX_ADVANCE_DISTANCE: u64 = 4;
/// impact whilst having 8 epochs without a block is a comfortable grace period. /// impact whilst having 8 epochs without a block is a comfortable grace period.
const MAX_FORK_CHOICE_DISTANCE: u64 = 256; const MAX_FORK_CHOICE_DISTANCE: u64 = 256;
/// Drop any unused block production state cache after this many slots.
const MAX_BLOCK_PRODUCTION_CACHE_DISTANCE: u64 = 4;
#[derive(Debug)] #[derive(Debug)]
enum Error { enum Error {
BeaconChain(BeaconChainError), BeaconChain(BeaconChainError),
@ -227,19 +230,73 @@ async fn state_advance_timer<T: BeaconChainTypes>(
// Prepare proposers so that the node can send payload attributes in the case where // Prepare proposers so that the node can send payload attributes in the case where
// it decides to abandon a proposer boost re-org. // it decides to abandon a proposer boost re-org.
if let Err(e) = beacon_chain.prepare_beacon_proposer(current_slot).await { let proposer_head = beacon_chain
warn!( .prepare_beacon_proposer(current_slot)
log, .await
"Unable to prepare proposer with lookahead"; .unwrap_or_else(|e| {
"error" => ?e, warn!(
"slot" => next_slot, log,
); "Unable to prepare proposer with lookahead";
} "error" => ?e,
"slot" => next_slot,
);
None
});
// Use a blocking task to avoid blocking the core executor whilst waiting for locks // Use a blocking task to avoid blocking the core executor whilst waiting for locks
// in `ForkChoiceSignalTx`. // in `ForkChoiceSignalTx`.
beacon_chain.task_executor.clone().spawn_blocking( beacon_chain.task_executor.clone().spawn_blocking(
move || { move || {
// If we're proposing, clone the head state preemptively so that it isn't on
// the hot path of proposing. We can delete this once we have tree-states.
if let Some(proposer_head) = proposer_head {
let mut cache = beacon_chain.block_production_state.lock();
// Avoid holding two states in memory. It's OK to hold the lock because
// we always lock the block production cache before the snapshot cache
// and we prefer for block production to wait for the block production
// cache if a clone is in-progress.
if cache
.as_ref()
.map_or(false, |(cached_head, _)| *cached_head != proposer_head)
{
drop(cache.take());
}
if let Some(proposer_state) = beacon_chain
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(proposer_head)
})
{
*cache = Some((proposer_head, proposer_state));
debug!(
log,
"Cloned state ready for block production";
"head_block_root" => ?proposer_head,
"slot" => next_slot
);
} else {
warn!(
log,
"Block production state missing from snapshot cache";
"head_block_root" => ?proposer_head,
"slot" => next_slot
);
}
} else {
// If we aren't proposing, drop any old block production cache to save
// memory.
let mut cache = beacon_chain.block_production_state.lock();
if let Some((_, state)) = &*cache {
if state.pre_state.slot() + MAX_BLOCK_PRODUCTION_CACHE_DISTANCE
<= current_slot
{
drop(cache.take());
}
}
}
// Signal block proposal for the next slot (if it happens to be waiting). // Signal block proposal for the next slot (if it happens to be waiting).
if let Some(tx) = &beacon_chain.fork_choice_signal_tx { if let Some(tx) = &beacon_chain.fork_choice_signal_tx {
if let Err(e) = tx.notify_fork_choice_complete(next_slot) { if let Err(e) = tx.notify_fork_choice_complete(next_slot) {