Use advanced state for block production (#2241)
## Issue Addressed NA ## Proposed Changes - Use the pre-states from #2174 during block production. - Running this on Pyrmont shows block production times dropping from ~550ms to ~150ms. - Create `crit` and `warn` logs when a block is published to the API later than we expect. - On mainnet we are issuing a warn if the block is published more than 1s later than the slot start and a crit for more than 3s. - Rename some methods on the `SnapshotCache` for clarity. - Add the ability to pass the state root to `BeaconChain::produce_block_on_state` to avoid computing a state root. This is a very common LH optimization. - Add a metric that tracks how late we broadcast blocks received from the HTTP API. This is *technically* a duplicate of a `ValidatorMonitor` log, but I wanted to have it for the case where we aren't monitoring validators too.
This commit is contained in:
parent
363f15f362
commit
e4eb0eb168
@ -1799,16 +1799,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
let head_info = self
|
||||
.head_info()
|
||||
.map_err(BlockProductionError::UnableToGetHeadInfo)?;
|
||||
let state = if head_info.slot < slot {
|
||||
let (state, state_root_opt) = if head_info.slot < slot {
|
||||
// Normal case: proposing a block atop the current head. Use the snapshot cache.
|
||||
if let Some(snapshot) = self
|
||||
if let Some(pre_state) = self
|
||||
.snapshot_cache
|
||||
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
|
||||
.and_then(|snapshot_cache| {
|
||||
snapshot_cache.get_cloned(head_info.block_root, CloneConfig::all())
|
||||
snapshot_cache.get_state_for_block_production(head_info.block_root)
|
||||
})
|
||||
{
|
||||
snapshot.beacon_state
|
||||
(pre_state.pre_state, pre_state.state_root)
|
||||
} else {
|
||||
warn!(
|
||||
self.log,
|
||||
@ -1816,8 +1816,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
"message" => "this block is more likely to be orphaned",
|
||||
"slot" => slot,
|
||||
);
|
||||
self.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
|
||||
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?
|
||||
let state = self
|
||||
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
|
||||
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?;
|
||||
|
||||
(state, None)
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
@ -1826,12 +1829,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
"message" => "this block is more likely to be orphaned",
|
||||
"slot" => slot,
|
||||
);
|
||||
self.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
|
||||
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?
|
||||
let state = self
|
||||
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
|
||||
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?;
|
||||
|
||||
(state, None)
|
||||
};
|
||||
drop(state_load_timer);
|
||||
|
||||
self.produce_block_on_state(state, slot, randao_reveal, validator_graffiti)
|
||||
self.produce_block_on_state(
|
||||
state,
|
||||
state_root_opt,
|
||||
slot,
|
||||
randao_reveal,
|
||||
validator_graffiti,
|
||||
)
|
||||
}
|
||||
|
||||
/// Produce a block for some `slot` upon the given `state`.
|
||||
@ -1840,11 +1852,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// function directly. This function is useful for purposefully creating forks or blocks at
|
||||
/// non-current slots.
|
||||
///
|
||||
/// The given state will be advanced to the given `produce_at_slot`, then a block will be
|
||||
/// produced at that slot height.
|
||||
/// If required, the given state will be advanced to the given `produce_at_slot`, then a block
|
||||
/// will be produced at that slot height.
|
||||
///
|
||||
/// The provided `state_root_opt` should only ever be set to `Some` if the contained value is
|
||||
/// equal to the root of `state`. Providing this value will serve as an optimization to avoid
|
||||
/// performing a tree hash in some scenarios.
|
||||
pub fn produce_block_on_state(
|
||||
&self,
|
||||
mut state: BeaconState<T::EthSpec>,
|
||||
mut state_root_opt: Option<Hash256>,
|
||||
produce_at_slot: Slot,
|
||||
randao_reveal: Signature,
|
||||
validator_graffiti: Option<Graffiti>,
|
||||
@ -1854,13 +1871,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.as_ref()
|
||||
.ok_or(BlockProductionError::NoEth1ChainConnection)?;
|
||||
|
||||
// It is invalid to try to produce a block using a state from a future slot.
|
||||
if state.slot > produce_at_slot {
|
||||
return Err(BlockProductionError::StateSlotTooHigh {
|
||||
produce_at_slot,
|
||||
state_slot: state.slot,
|
||||
});
|
||||
}
|
||||
|
||||
let slot_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_SLOT_PROCESS_TIMES);
|
||||
// If required, transition the new state to the present slot.
|
||||
//
|
||||
// Note: supplying some `state_root` when it it is known would be a cheap and easy
|
||||
// optimization.
|
||||
while state.slot < produce_at_slot {
|
||||
per_slot_processing(&mut state, None, &self.spec)?;
|
||||
// Using `state_root.take()` here ensures that we consume the `state_root` on the first
|
||||
// iteration and never use it again.
|
||||
per_slot_processing(&mut state, state_root_opt.take(), &self.spec)?;
|
||||
}
|
||||
drop(slot_timer);
|
||||
|
||||
|
@ -1242,8 +1242,9 @@ fn load_parent<T: BeaconChainTypes>(
|
||||
let result = if let Some(snapshot) = chain
|
||||
.snapshot_cache
|
||||
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
|
||||
.and_then(|mut snapshot_cache| snapshot_cache.try_remove(block.parent_root()))
|
||||
{
|
||||
.and_then(|mut snapshot_cache| {
|
||||
snapshot_cache.get_state_for_block_processing(block.parent_root())
|
||||
}) {
|
||||
Ok((snapshot.into_pre_state(), block))
|
||||
} else {
|
||||
// Load the blocks parent block from the database, returning invalid if that block is not
|
||||
|
@ -137,6 +137,10 @@ pub enum BlockProductionError {
|
||||
/// The `BeaconChain` was explicitly configured _without_ a connection to eth1, therefore it
|
||||
/// cannot produce blocks.
|
||||
NoEth1ChainConnection,
|
||||
StateSlotTooHigh {
|
||||
produce_at_slot: Slot,
|
||||
state_slot: Slot,
|
||||
},
|
||||
}
|
||||
|
||||
easy_from_to!(BlockProcessingError, BlockProductionError);
|
||||
|
@ -55,14 +55,18 @@ impl<T: EthSpec> CacheItem<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> Into<BeaconSnapshot<T>> for CacheItem<T> {
|
||||
fn into(self) -> BeaconSnapshot<T> {
|
||||
BeaconSnapshot {
|
||||
beacon_state: self.beacon_state,
|
||||
beacon_block: self.beacon_block,
|
||||
beacon_block_root: self.beacon_block_root,
|
||||
}
|
||||
}
|
||||
/// The information required for block production.
|
||||
pub struct BlockProductionPreState<T: EthSpec> {
|
||||
/// This state may or may not have been advanced forward a single slot.
|
||||
///
|
||||
/// See the documentation in the `crate::state_advance_timer` module for more information.
|
||||
pub pre_state: BeaconState<T>,
|
||||
/// This value will only be `Some` if `self.pre_state` was **not** advanced forward a single
|
||||
/// slot.
|
||||
///
|
||||
/// This value can be used to avoid tree-hashing the state during the first call to
|
||||
/// `per_slot_processing`.
|
||||
pub state_root: Option<Hash256>,
|
||||
}
|
||||
|
||||
pub enum StateAdvance<T: EthSpec> {
|
||||
@ -89,6 +93,16 @@ pub struct CacheItem<T: EthSpec> {
|
||||
pre_state: Option<BeaconState<T>>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> Into<BeaconSnapshot<T>> for CacheItem<T> {
|
||||
fn into(self) -> BeaconSnapshot<T> {
|
||||
BeaconSnapshot {
|
||||
beacon_state: self.beacon_state,
|
||||
beacon_block: self.beacon_block,
|
||||
beacon_block_root: self.beacon_block_root,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides a cache of `BeaconSnapshot` that is intended primarily for block processing.
|
||||
///
|
||||
/// ## Cache Queuing
|
||||
@ -152,14 +166,46 @@ impl<T: EthSpec> SnapshotCache<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// If there is a snapshot with `block_root`, remove and return it.
|
||||
pub fn try_remove(&mut self, block_root: Hash256) -> Option<CacheItem<T>> {
|
||||
/// If available, returns a `CacheItem` that should be used for importing/processing a block.
|
||||
/// The method will remove the block from `self`, carrying across any caches that may or may not
|
||||
/// be built.
|
||||
pub fn get_state_for_block_processing(&mut self, block_root: Hash256) -> Option<CacheItem<T>> {
|
||||
self.snapshots
|
||||
.iter()
|
||||
.position(|snapshot| snapshot.beacon_block_root == block_root)
|
||||
.map(|i| self.snapshots.remove(i))
|
||||
}
|
||||
|
||||
/// If available, obtains a clone of a `BeaconState` that should be used for block production.
|
||||
/// The clone will use `CloneConfig:all()`, ensuring any tree-hash cache is cloned too.
|
||||
///
|
||||
/// ## Note
|
||||
///
|
||||
/// This method clones the `BeaconState` (instead of removing it) since we assume that any block
|
||||
/// we produce will soon be pushed to the `BeaconChain` for importing/processing. Keeping a copy
|
||||
/// of that `BeaconState` in `self` will greatly help with import times.
|
||||
pub fn get_state_for_block_production(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
) -> Option<BlockProductionPreState<T>> {
|
||||
self.snapshots
|
||||
.iter()
|
||||
.find(|snapshot| snapshot.beacon_block_root == block_root)
|
||||
.map(|snapshot| {
|
||||
if let Some(pre_state) = &snapshot.pre_state {
|
||||
BlockProductionPreState {
|
||||
pre_state: pre_state.clone_with(CloneConfig::all()),
|
||||
state_root: None,
|
||||
}
|
||||
} else {
|
||||
BlockProductionPreState {
|
||||
pre_state: snapshot.beacon_state.clone_with(CloneConfig::all()),
|
||||
state_root: Some(snapshot.beacon_block.state_root()),
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// If there is a snapshot with `block_root`, clone it and return the clone.
|
||||
pub fn get_cloned(
|
||||
&self,
|
||||
@ -288,7 +334,9 @@ mod test {
|
||||
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
|
||||
|
||||
assert!(
|
||||
cache.try_remove(Hash256::from_low_u64_be(1)).is_none(),
|
||||
cache
|
||||
.get_state_for_block_processing(Hash256::from_low_u64_be(1))
|
||||
.is_none(),
|
||||
"the snapshot with the lowest slot should have been removed during the insert function"
|
||||
);
|
||||
assert!(cache
|
||||
@ -305,17 +353,17 @@ mod test {
|
||||
);
|
||||
assert!(
|
||||
cache
|
||||
.try_remove(Hash256::from_low_u64_be(0))
|
||||
.get_state_for_block_processing(Hash256::from_low_u64_be(0))
|
||||
.expect("the head should still be in the cache")
|
||||
.beacon_block_root
|
||||
== Hash256::from_low_u64_be(0),
|
||||
"try_remove should get the correct snapshot"
|
||||
"get_state_for_block_processing should get the correct snapshot"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
cache.snapshots.len(),
|
||||
CACHE_SIZE - 1,
|
||||
"try_remove should shorten the cache"
|
||||
"get_state_for_block_processing should shorten the cache"
|
||||
);
|
||||
|
||||
// Prune the cache. Afterwards it should look like:
|
||||
@ -337,11 +385,11 @@ mod test {
|
||||
// Ensure that the new head value was not removed from the cache.
|
||||
assert!(
|
||||
cache
|
||||
.try_remove(Hash256::from_low_u64_be(2))
|
||||
.get_state_for_block_processing(Hash256::from_low_u64_be(2))
|
||||
.expect("the new head should still be in the cache")
|
||||
.beacon_block_root
|
||||
== Hash256::from_low_u64_be(2),
|
||||
"try_remove should get the correct snapshot"
|
||||
"get_state_for_block_processing should get the correct snapshot"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -408,7 +408,7 @@ where
|
||||
|
||||
let (block, state) = self
|
||||
.chain
|
||||
.produce_block_on_state(state, slot, randao_reveal, Some(graffiti))
|
||||
.produce_block_on_state(state, None, slot, randao_reveal, Some(graffiti))
|
||||
.unwrap();
|
||||
|
||||
let signed_block = block.sign(
|
||||
|
@ -13,7 +13,8 @@ mod validator_inclusion;
|
||||
|
||||
use beacon_chain::{
|
||||
attestation_verification::SignatureVerifiedAttestation,
|
||||
observed_operations::ObservationOutcome, validator_monitor::timestamp_now,
|
||||
observed_operations::ObservationOutcome,
|
||||
validator_monitor::{get_block_delay_ms, timestamp_now},
|
||||
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
};
|
||||
use beacon_proposer_cache::BeaconProposerCache;
|
||||
@ -822,6 +823,15 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
PubsubMessage::BeaconBlock(Box::new(block.clone())),
|
||||
)?;
|
||||
|
||||
// Determine the delay after the start of the slot, register it with metrics.
|
||||
let delay =
|
||||
get_block_delay_ms(seen_timestamp, &block.message, &chain.slot_clock);
|
||||
metrics::observe_duration(
|
||||
&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES,
|
||||
delay,
|
||||
);
|
||||
|
||||
|
||||
match chain.process_block(block.clone()) {
|
||||
Ok(root) => {
|
||||
info!(
|
||||
@ -844,6 +854,33 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.fork_choice()
|
||||
.map_err(warp_utils::reject::beacon_chain_error)?;
|
||||
|
||||
// Perform some logging to inform users if their blocks are being produced
|
||||
// late.
|
||||
//
|
||||
// Check to see the thresholds are non-zero to avoid logging errors with small
|
||||
// slot times (e.g., during testing)
|
||||
let crit_threshold = chain.spec.seconds_per_slot / 3;
|
||||
let warn_threshold = chain.spec.seconds_per_slot / 6;
|
||||
if crit_threshold > 0 && delay.as_secs() > crit_threshold {
|
||||
crit!(
|
||||
log,
|
||||
"Block was broadcast too late";
|
||||
"root" => ?root,
|
||||
"slot" => block.slot(),
|
||||
"delay_ms" => delay.as_millis(),
|
||||
"msg" => "system may be overloaded, block likely to be orphaned",
|
||||
)
|
||||
} else if warn_threshold > 0 && delay.as_secs() > warn_threshold {
|
||||
warn!(
|
||||
log,
|
||||
"Block broadcast was delayed";
|
||||
"root" => ?root,
|
||||
"slot" => block.slot(),
|
||||
"delay_ms" => delay.as_millis(),
|
||||
"msg" => "system may be overloaded, block may be orphaned",
|
||||
)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
|
@ -29,4 +29,8 @@ lazy_static::lazy_static! {
|
||||
"http_api_beacon_proposer_cache_misses_total",
|
||||
"Count of times the proposer cache has been missed",
|
||||
);
|
||||
pub static ref HTTP_API_BLOCK_BROADCAST_DELAY_TIMES: Result<Histogram> = try_create_histogram(
|
||||
"http_api_block_broadcast_delay_times",
|
||||
"Time between start of the slot and when the block was broadcast"
|
||||
);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user