check the da cache and the attester cache in responding to RPC requests (#5138)
* check the da cache and the attester cache in responding to RPC requests * use the processing cache instead * update comment * add da cache metrics * rename early attester cache method * Merge branch 'unstable' of https://github.com/sigp/lighthouse into check-da-cache-in-rpc-response * make rustup update run on the runners * Revert "make rustup update run on the runners" This reverts commit d097e9bfa84a13b1d7813c03df38e7756fb0bfc5.
This commit is contained in:
parent
a264afd19f
commit
f21472991d
@ -19,7 +19,7 @@ use types::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
#[derive(PartialEq)]
|
#[derive(PartialEq)]
|
||||||
pub enum CheckEarlyAttesterCache {
|
pub enum CheckCaches {
|
||||||
Yes,
|
Yes,
|
||||||
No,
|
No,
|
||||||
}
|
}
|
||||||
@ -385,14 +385,14 @@ impl<E: EthSpec> EngineRequest<E> {
|
|||||||
|
|
||||||
pub struct BeaconBlockStreamer<T: BeaconChainTypes> {
|
pub struct BeaconBlockStreamer<T: BeaconChainTypes> {
|
||||||
execution_layer: ExecutionLayer<T::EthSpec>,
|
execution_layer: ExecutionLayer<T::EthSpec>,
|
||||||
check_early_attester_cache: CheckEarlyAttesterCache,
|
check_caches: CheckCaches,
|
||||||
beacon_chain: Arc<BeaconChain<T>>,
|
beacon_chain: Arc<BeaconChain<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
beacon_chain: &Arc<BeaconChain<T>>,
|
beacon_chain: &Arc<BeaconChain<T>>,
|
||||||
check_early_attester_cache: CheckEarlyAttesterCache,
|
check_caches: CheckCaches,
|
||||||
) -> Result<Self, BeaconChainError> {
|
) -> Result<Self, BeaconChainError> {
|
||||||
let execution_layer = beacon_chain
|
let execution_layer = beacon_chain
|
||||||
.execution_layer
|
.execution_layer
|
||||||
@ -402,17 +402,17 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
|||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
execution_layer,
|
execution_layer,
|
||||||
check_early_attester_cache,
|
check_caches,
|
||||||
beacon_chain: beacon_chain.clone(),
|
beacon_chain: beacon_chain.clone(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_early_attester_cache(
|
fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
|
||||||
&self,
|
if self.check_caches == CheckCaches::Yes {
|
||||||
root: Hash256,
|
self.beacon_chain
|
||||||
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
|
.data_availability_checker
|
||||||
if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes {
|
.get_block(&root)
|
||||||
self.beacon_chain.early_attester_cache.get_block(root)
|
.or(self.beacon_chain.early_attester_cache.get_block(root))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@ -422,10 +422,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
|||||||
let mut db_blocks = Vec::new();
|
let mut db_blocks = Vec::new();
|
||||||
|
|
||||||
for root in block_roots {
|
for root in block_roots {
|
||||||
if let Some(cached_block) = self
|
if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) {
|
||||||
.check_early_attester_cache(root)
|
|
||||||
.map(LoadedBeaconBlock::Full)
|
|
||||||
{
|
|
||||||
db_blocks.push((root, Ok(Some(cached_block))));
|
db_blocks.push((root, Ok(Some(cached_block))));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -554,7 +551,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
|||||||
"Using slower fallback method of eth_getBlockByHash()"
|
"Using slower fallback method of eth_getBlockByHash()"
|
||||||
);
|
);
|
||||||
for root in block_roots {
|
for root in block_roots {
|
||||||
let cached_block = self.check_early_attester_cache(root);
|
let cached_block = self.check_caches(root);
|
||||||
let block_result = if cached_block.is_some() {
|
let block_result = if cached_block.is_some() {
|
||||||
Ok(cached_block)
|
Ok(cached_block)
|
||||||
} else {
|
} else {
|
||||||
@ -682,7 +679,7 @@ impl From<Error> for BeaconChainError {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
|
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
|
||||||
use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType};
|
use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType};
|
||||||
use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES};
|
use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES};
|
||||||
use execution_layer::EngineCapabilities;
|
use execution_layer::EngineCapabilities;
|
||||||
@ -804,7 +801,7 @@ mod tests {
|
|||||||
let start = epoch * slots_per_epoch;
|
let start = epoch * slots_per_epoch;
|
||||||
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
|
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
|
||||||
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
|
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
|
||||||
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No)
|
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No)
|
||||||
.expect("should create streamer");
|
.expect("should create streamer");
|
||||||
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
|
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
|
||||||
streamer.stream(epoch_roots.clone(), block_tx).await;
|
streamer.stream(epoch_roots.clone(), block_tx).await;
|
||||||
@ -945,7 +942,7 @@ mod tests {
|
|||||||
let start = epoch * slots_per_epoch;
|
let start = epoch * slots_per_epoch;
|
||||||
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
|
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
|
||||||
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
|
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
|
||||||
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No)
|
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No)
|
||||||
.expect("should create streamer");
|
.expect("should create streamer");
|
||||||
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
|
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
|
||||||
streamer.stream(epoch_roots.clone(), block_tx).await;
|
streamer.stream(epoch_roots.clone(), block_tx).await;
|
||||||
|
@ -4,7 +4,7 @@ use crate::attestation_verification::{
|
|||||||
VerifiedUnaggregatedAttestation,
|
VerifiedUnaggregatedAttestation,
|
||||||
};
|
};
|
||||||
use crate::attester_cache::{AttesterCache, AttesterCacheKey};
|
use crate::attester_cache::{AttesterCache, AttesterCacheKey};
|
||||||
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
|
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
|
||||||
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
|
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
|
||||||
use crate::beacon_proposer_cache::BeaconProposerCache;
|
use crate::beacon_proposer_cache::BeaconProposerCache;
|
||||||
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||||
@ -1131,7 +1131,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
/// ## Errors
|
/// ## Errors
|
||||||
///
|
///
|
||||||
/// May return a database error.
|
/// May return a database error.
|
||||||
pub fn get_blocks_checking_early_attester_cache(
|
pub fn get_blocks_checking_caches(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
block_roots: Vec<Hash256>,
|
block_roots: Vec<Hash256>,
|
||||||
executor: &TaskExecutor,
|
executor: &TaskExecutor,
|
||||||
@ -1144,10 +1144,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
>,
|
>,
|
||||||
Error,
|
Error,
|
||||||
> {
|
> {
|
||||||
Ok(
|
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?
|
||||||
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::Yes)?
|
.launch_stream(block_roots, executor))
|
||||||
.launch_stream(block_roots, executor),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_blocks(
|
pub fn get_blocks(
|
||||||
@ -1163,10 +1161,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
>,
|
>,
|
||||||
Error,
|
Error,
|
||||||
> {
|
> {
|
||||||
Ok(
|
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?
|
||||||
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::No)?
|
.launch_stream(block_roots, executor))
|
||||||
.launch_stream(block_roots, executor),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_blobs_checking_early_attester_cache(
|
pub fn get_blobs_checking_early_attester_cache(
|
||||||
@ -2960,18 +2956,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
unverified_block: B,
|
unverified_block: B,
|
||||||
notify_execution_layer: NotifyExecutionLayer,
|
notify_execution_layer: NotifyExecutionLayer,
|
||||||
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
|
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
|
||||||
if let Ok(commitments) = unverified_block
|
self.data_availability_checker
|
||||||
.block()
|
.notify_block(block_root, unverified_block.block_cloned());
|
||||||
.message()
|
|
||||||
.body()
|
|
||||||
.blob_kzg_commitments()
|
|
||||||
{
|
|
||||||
self.data_availability_checker.notify_block_commitments(
|
|
||||||
unverified_block.block().slot(),
|
|
||||||
block_root,
|
|
||||||
commitments.clone(),
|
|
||||||
);
|
|
||||||
};
|
|
||||||
let r = self
|
let r = self
|
||||||
.process_block(block_root, unverified_block, notify_execution_layer, || {
|
.process_block(block_root, unverified_block, notify_execution_layer, || {
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -764,6 +764,7 @@ pub trait IntoExecutionPendingBlock<T: BeaconChainTypes>: Sized {
|
|||||||
) -> Result<ExecutionPendingBlock<T>, BlockSlashInfo<BlockError<T::EthSpec>>>;
|
) -> Result<ExecutionPendingBlock<T>, BlockSlashInfo<BlockError<T::EthSpec>>>;
|
||||||
|
|
||||||
fn block(&self) -> &SignedBeaconBlock<T::EthSpec>;
|
fn block(&self) -> &SignedBeaconBlock<T::EthSpec>;
|
||||||
|
fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
|
impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
|
||||||
@ -1017,6 +1018,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for GossipVerifiedBlock<T
|
|||||||
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
|
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
|
||||||
self.block.as_block()
|
self.block.as_block()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
|
||||||
|
self.block.clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
|
impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
|
||||||
@ -1168,6 +1173,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for SignatureVerifiedBloc
|
|||||||
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
|
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
|
||||||
self.block.as_block()
|
self.block.as_block()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
|
||||||
|
self.block.block_cloned()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock<T::EthSpec>> {
|
impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock<T::EthSpec>> {
|
||||||
@ -1198,6 +1207,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock
|
|||||||
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
|
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
|
||||||
|
self.clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for RpcBlock<T::EthSpec> {
|
impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for RpcBlock<T::EthSpec> {
|
||||||
@ -1228,6 +1241,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for RpcBlock<T::EthSpec>
|
|||||||
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
|
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
|
||||||
self.as_block()
|
self.as_block()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
|
||||||
|
self.block_cloned()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
|
impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
|
||||||
|
@ -46,6 +46,13 @@ impl<E: EthSpec> RpcBlock<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
|
||||||
|
match &self.block {
|
||||||
|
RpcBlockInner::Block(block) => block.clone(),
|
||||||
|
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
|
pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
|
||||||
match &self.block {
|
match &self.block {
|
||||||
RpcBlockInner::Block(_) => None,
|
RpcBlockInner::Block(_) => None,
|
||||||
|
@ -20,7 +20,7 @@ use std::fmt::Debug;
|
|||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
|
use types::beacon_block_body::KzgCommitmentOpts;
|
||||||
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
|
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
|
||||||
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
|
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
|
||||||
|
|
||||||
@ -192,6 +192,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
self.availability_cache.peek_blob(blob_id)
|
self.availability_cache.peek_blob(blob_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a block from the availability cache. Includes any blocks we are currently processing.
|
||||||
|
pub fn get_block(&self, block_root: &Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
|
||||||
|
self.processing_cache
|
||||||
|
.read()
|
||||||
|
.get(block_root)
|
||||||
|
.and_then(|cached| cached.block.clone())
|
||||||
|
}
|
||||||
|
|
||||||
/// Put a list of blobs received via RPC into the availability cache. This performs KZG
|
/// Put a list of blobs received via RPC into the availability cache. This performs KZG
|
||||||
/// verification on the blobs in the list.
|
/// verification on the blobs in the list.
|
||||||
pub fn put_rpc_blobs(
|
pub fn put_rpc_blobs(
|
||||||
@ -344,20 +352,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch())
|
block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds block commitments to the processing cache. These commitments are unverified but caching
|
/// Adds a block to the processing cache. This block's commitments are unverified but caching
|
||||||
/// them here is useful to avoid duplicate downloads of blocks, as well as understanding
|
/// them here is useful to avoid duplicate downloads of blocks, as well as understanding
|
||||||
/// our blob download requirements.
|
/// our blob download requirements. We will also serve this over RPC.
|
||||||
pub fn notify_block_commitments(
|
pub fn notify_block(&self, block_root: Hash256, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
|
||||||
&self,
|
let slot = block.slot();
|
||||||
slot: Slot,
|
|
||||||
block_root: Hash256,
|
|
||||||
commitments: KzgCommitments<T::EthSpec>,
|
|
||||||
) {
|
|
||||||
self.processing_cache
|
self.processing_cache
|
||||||
.write()
|
.write()
|
||||||
.entry(block_root)
|
.entry(block_root)
|
||||||
.or_insert_with(|| ProcessingComponents::new(slot))
|
.or_insert_with(|| ProcessingComponents::new(slot))
|
||||||
.merge_block(commitments);
|
.merge_block(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a single blob commitment to the processing cache. This commitment is unverified but caching
|
/// Add a single blob commitment to the processing cache. This commitment is unverified but caching
|
||||||
@ -450,6 +454,24 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> {
|
pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> {
|
||||||
self.availability_cache.write_all_to_disk()
|
self.availability_cache.write_all_to_disk()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Collects metrics from the data availability checker.
|
||||||
|
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
|
||||||
|
DataAvailabilityCheckerMetrics {
|
||||||
|
processing_cache_size: self.processing_cache.read().len(),
|
||||||
|
num_store_entries: self.availability_cache.num_store_entries(),
|
||||||
|
state_cache_size: self.availability_cache.state_cache_size(),
|
||||||
|
block_cache_size: self.availability_cache.block_cache_size(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper struct to group data availability checker metrics.
|
||||||
|
pub struct DataAvailabilityCheckerMetrics {
|
||||||
|
pub processing_cache_size: usize,
|
||||||
|
pub num_store_entries: usize,
|
||||||
|
pub state_cache_size: usize,
|
||||||
|
pub block_cache_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_availability_cache_maintenance_service<T: BeaconChainTypes>(
|
pub fn start_availability_cache_maintenance_service<T: BeaconChainTypes>(
|
||||||
@ -597,6 +619,15 @@ pub enum MaybeAvailableBlock<E: EthSpec> {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<E: EthSpec> MaybeAvailableBlock<E> {
|
||||||
|
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
|
||||||
|
match self {
|
||||||
|
Self::Available(block) => block.block_cloned(),
|
||||||
|
Self::AvailabilityPending { block, .. } => block.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum MissingBlobs {
|
pub enum MissingBlobs {
|
||||||
/// We know for certain these blobs are missing.
|
/// We know for certain these blobs are missing.
|
||||||
|
@ -182,9 +182,9 @@ macro_rules! impl_availability_view {
|
|||||||
|
|
||||||
impl_availability_view!(
|
impl_availability_view!(
|
||||||
ProcessingComponents,
|
ProcessingComponents,
|
||||||
KzgCommitments<E>,
|
Arc<SignedBeaconBlock<E>>,
|
||||||
KzgCommitment,
|
KzgCommitment,
|
||||||
block_commitments,
|
block,
|
||||||
blob_commitments
|
blob_commitments
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -212,12 +212,6 @@ pub trait GetCommitment<E: EthSpec> {
|
|||||||
fn get_commitment(&self) -> &KzgCommitment;
|
fn get_commitment(&self) -> &KzgCommitment;
|
||||||
}
|
}
|
||||||
|
|
||||||
// These implementations are required to implement `AvailabilityView` for `ProcessingView`.
|
|
||||||
impl<E: EthSpec> GetCommitments<E> for KzgCommitments<E> {
|
|
||||||
fn get_commitments(&self) -> KzgCommitments<E> {
|
|
||||||
self.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<E: EthSpec> GetCommitment<E> for KzgCommitment {
|
impl<E: EthSpec> GetCommitment<E> for KzgCommitment {
|
||||||
fn get_commitment(&self) -> &KzgCommitment {
|
fn get_commitment(&self) -> &KzgCommitment {
|
||||||
self
|
self
|
||||||
@ -310,7 +304,7 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ProcessingViewSetup<E> = (
|
type ProcessingViewSetup<E> = (
|
||||||
KzgCommitments<E>,
|
Arc<SignedBeaconBlock<E>>,
|
||||||
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
|
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
|
||||||
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
|
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
|
||||||
);
|
);
|
||||||
@ -320,12 +314,6 @@ pub mod tests {
|
|||||||
valid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
|
valid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
|
||||||
invalid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
|
invalid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
|
||||||
) -> ProcessingViewSetup<E> {
|
) -> ProcessingViewSetup<E> {
|
||||||
let commitments = block
|
|
||||||
.message()
|
|
||||||
.body()
|
|
||||||
.blob_kzg_commitments()
|
|
||||||
.unwrap()
|
|
||||||
.clone();
|
|
||||||
let blobs = FixedVector::from(
|
let blobs = FixedVector::from(
|
||||||
valid_blobs
|
valid_blobs
|
||||||
.iter()
|
.iter()
|
||||||
@ -338,7 +326,7 @@ pub mod tests {
|
|||||||
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
|
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
);
|
);
|
||||||
(commitments, blobs, invalid_blobs)
|
(Arc::new(block), blobs, invalid_blobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
type PendingComponentsSetup<E> = (
|
type PendingComponentsSetup<E> = (
|
||||||
|
@ -363,6 +363,16 @@ impl<T: BeaconChainTypes> Critical<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the number of pending component entries in memory.
|
||||||
|
pub fn num_blocks(&self) -> usize {
|
||||||
|
self.in_memory.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the number of entries that have overflowed to disk.
|
||||||
|
pub fn num_store_entries(&self) -> usize {
|
||||||
|
self.store_keys.len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is the main struct for this module. Outside methods should
|
/// This is the main struct for this module. Outside methods should
|
||||||
@ -671,6 +681,21 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
|
|||||||
pub fn state_lru_cache(&self) -> &StateLRUCache<T> {
|
pub fn state_lru_cache(&self) -> &StateLRUCache<T> {
|
||||||
&self.state_cache
|
&self.state_cache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Number of states stored in memory in the cache.
|
||||||
|
pub fn state_cache_size(&self) -> usize {
|
||||||
|
self.state_cache.lru_cache().read().len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Number of pending component entries in memory in the cache.
|
||||||
|
pub fn block_cache_size(&self) -> usize {
|
||||||
|
self.critical.read().num_blocks()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the number of entries in the cache that have overflowed to disk.
|
||||||
|
pub fn num_store_entries(&self) -> usize {
|
||||||
|
self.critical.read().num_store_entries()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ssz::Encode for OverflowKey {
|
impl ssz::Encode for OverflowKey {
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
use crate::data_availability_checker::AvailabilityView;
|
use crate::data_availability_checker::AvailabilityView;
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
|
use std::sync::Arc;
|
||||||
use types::{EthSpec, Hash256, Slot};
|
use types::beacon_block_body::KzgCommitmentOpts;
|
||||||
|
use types::{EthSpec, Hash256, SignedBeaconBlock, Slot};
|
||||||
|
|
||||||
/// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp
|
/// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp
|
||||||
/// a view of what we have and what we require. This cache serves a slightly different purpose than
|
/// a view of what we have and what we require. This cache serves a slightly different purpose than
|
||||||
@ -37,6 +38,9 @@ impl<E: EthSpec> ProcessingCache<E> {
|
|||||||
}
|
}
|
||||||
roots_missing_components
|
roots_missing_components
|
||||||
}
|
}
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.processing_cache.len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@ -45,7 +49,7 @@ pub struct ProcessingComponents<E: EthSpec> {
|
|||||||
/// Blobs required for a block can only be known if we have seen the block. So `Some` here
|
/// Blobs required for a block can only be known if we have seen the block. So `Some` here
|
||||||
/// means we've seen it, a `None` means we haven't. The `kzg_commitments` value helps us figure
|
/// means we've seen it, a `None` means we haven't. The `kzg_commitments` value helps us figure
|
||||||
/// out whether incoming blobs actually match the block.
|
/// out whether incoming blobs actually match the block.
|
||||||
pub block_commitments: Option<KzgCommitments<E>>,
|
pub block: Option<Arc<SignedBeaconBlock<E>>>,
|
||||||
/// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See
|
/// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See
|
||||||
/// `AvailabilityView`'s trait definition for more details.
|
/// `AvailabilityView`'s trait definition for more details.
|
||||||
pub blob_commitments: KzgCommitmentOpts<E>,
|
pub blob_commitments: KzgCommitmentOpts<E>,
|
||||||
@ -55,7 +59,7 @@ impl<E: EthSpec> ProcessingComponents<E> {
|
|||||||
pub fn new(slot: Slot) -> Self {
|
pub fn new(slot: Slot) -> Self {
|
||||||
Self {
|
Self {
|
||||||
slot,
|
slot,
|
||||||
block_commitments: None,
|
block: None,
|
||||||
blob_commitments: KzgCommitmentOpts::<E>::default(),
|
blob_commitments: KzgCommitmentOpts::<E>::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -67,7 +71,7 @@ impl<E: EthSpec> ProcessingComponents<E> {
|
|||||||
pub fn empty(_block_root: Hash256) -> Self {
|
pub fn empty(_block_root: Hash256) -> Self {
|
||||||
Self {
|
Self {
|
||||||
slot: Slot::new(0),
|
slot: Slot::new(0),
|
||||||
block_commitments: None,
|
block: None,
|
||||||
blob_commitments: KzgCommitmentOpts::<E>::default(),
|
blob_commitments: KzgCommitmentOpts::<E>::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -190,8 +190,7 @@ impl<T: BeaconChainTypes> StateLRUCache<T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// returns the state cache for inspection in tests
|
/// returns the state cache for inspection
|
||||||
#[cfg(test)]
|
|
||||||
pub fn lru_cache(&self) -> &RwLock<LruCache<Hash256, BeaconState<T::EthSpec>>> {
|
pub fn lru_cache(&self) -> &RwLock<LruCache<Hash256, BeaconState<T::EthSpec>>> {
|
||||||
&self.states
|
&self.states
|
||||||
}
|
}
|
||||||
|
@ -1129,6 +1129,31 @@ lazy_static! {
|
|||||||
Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0])
|
Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0])
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Data Availability cache metrics
|
||||||
|
*/
|
||||||
|
pub static ref DATA_AVAILABILITY_PROCESSING_CACHE_SIZE: Result<IntGauge> =
|
||||||
|
try_create_int_gauge(
|
||||||
|
"data_availability_processing_cache_size",
|
||||||
|
"Number of entries in the data availability processing cache."
|
||||||
|
);
|
||||||
|
pub static ref DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE: Result<IntGauge> =
|
||||||
|
try_create_int_gauge(
|
||||||
|
"data_availability_overflow_memory_block_cache_size",
|
||||||
|
"Number of entries in the data availability overflow block memory cache."
|
||||||
|
);
|
||||||
|
pub static ref DATA_AVAILABILITY_OVERFLOW_MEMORY_STATE_CACHE_SIZE: Result<IntGauge> =
|
||||||
|
try_create_int_gauge(
|
||||||
|
"data_availability_overflow_memory_state_cache_size",
|
||||||
|
"Number of entries in the data availability overflow state memory cache."
|
||||||
|
);
|
||||||
|
pub static ref DATA_AVAILABILITY_OVERFLOW_STORE_CACHE_SIZE: Result<IntGauge> =
|
||||||
|
try_create_int_gauge(
|
||||||
|
"data_availability_overflow_store_cache_size",
|
||||||
|
"Number of entries in the data availability overflow store cache."
|
||||||
|
);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* light_client server metrics
|
* light_client server metrics
|
||||||
*/
|
*/
|
||||||
@ -1171,6 +1196,24 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let da_checker_metrics = beacon_chain.data_availability_checker.metrics();
|
||||||
|
set_gauge_by_usize(
|
||||||
|
&DATA_AVAILABILITY_PROCESSING_CACHE_SIZE,
|
||||||
|
da_checker_metrics.processing_cache_size,
|
||||||
|
);
|
||||||
|
set_gauge_by_usize(
|
||||||
|
&DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE,
|
||||||
|
da_checker_metrics.block_cache_size,
|
||||||
|
);
|
||||||
|
set_gauge_by_usize(
|
||||||
|
&DATA_AVAILABILITY_OVERFLOW_MEMORY_STATE_CACHE_SIZE,
|
||||||
|
da_checker_metrics.state_cache_size,
|
||||||
|
);
|
||||||
|
set_gauge_by_usize(
|
||||||
|
&DATA_AVAILABILITY_OVERFLOW_STORE_CACHE_SIZE,
|
||||||
|
da_checker_metrics.num_store_entries,
|
||||||
|
);
|
||||||
|
|
||||||
if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() {
|
if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() {
|
||||||
set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size);
|
set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size);
|
||||||
set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_LOOKUP_COUNT, num_lookups);
|
set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_LOOKUP_COUNT, num_lookups);
|
||||||
|
@ -140,7 +140,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
let requested_blocks = request.block_roots().len();
|
let requested_blocks = request.block_roots().len();
|
||||||
let mut block_stream = match self
|
let mut block_stream = match self
|
||||||
.chain
|
.chain
|
||||||
.get_blocks_checking_early_attester_cache(request.block_roots().to_vec(), &executor)
|
.get_blocks_checking_caches(request.block_roots().to_vec(), &executor)
|
||||||
{
|
{
|
||||||
Ok(block_stream) => block_stream,
|
Ok(block_stream) => block_stream,
|
||||||
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
|
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
|
||||||
|
Loading…
Reference in New Issue
Block a user