From f21472991dbbc9e6e18310a01fc2c4d9ce7c7348 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Sun, 18 Feb 2024 21:22:15 -0500 Subject: [PATCH] check the da cache and the attester cache in responding to RPC requests (#5138) * check the da cache and the attester cache in responding to RPC requests * use the processing cache instead * update comment * add da cache metrics * rename early attester cache method * Merge branch 'unstable' of https://github.com/sigp/lighthouse into check-da-cache-in-rpc-response * make rustup update run on the runners * Revert "make rustup update run on the runners" This reverts commit d097e9bfa84a13b1d7813c03df38e7756fb0bfc5. --- .../beacon_chain/src/beacon_block_streamer.rs | 33 ++++++------ beacon_node/beacon_chain/src/beacon_chain.rs | 30 +++-------- .../beacon_chain/src/block_verification.rs | 17 +++++++ .../src/block_verification_types.rs | 7 +++ .../src/data_availability_checker.rs | 51 +++++++++++++++---- .../availability_view.rs | 20 ++------ .../overflow_lru_cache.rs | 25 +++++++++ .../processing_cache.rs | 14 +++-- .../state_lru_cache.rs | 3 +- beacon_node/beacon_chain/src/metrics.rs | 43 ++++++++++++++++ .../network_beacon_processor/rpc_methods.rs | 2 +- 11 files changed, 171 insertions(+), 74 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index 9312d4511..4f4f8ed1f 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -19,7 +19,7 @@ use types::{ }; #[derive(PartialEq)] -pub enum CheckEarlyAttesterCache { +pub enum CheckCaches { Yes, No, } @@ -385,14 +385,14 @@ impl EngineRequest { pub struct BeaconBlockStreamer { execution_layer: ExecutionLayer, - check_early_attester_cache: CheckEarlyAttesterCache, + check_caches: CheckCaches, beacon_chain: Arc>, } impl BeaconBlockStreamer { pub fn new( beacon_chain: &Arc>, - check_early_attester_cache: CheckEarlyAttesterCache, + check_caches: CheckCaches, ) -> Result { let execution_layer = beacon_chain .execution_layer @@ -402,17 +402,17 @@ impl BeaconBlockStreamer { Ok(Self { execution_layer, - check_early_attester_cache, + check_caches, beacon_chain: beacon_chain.clone(), }) } - fn check_early_attester_cache( - &self, - root: Hash256, - ) -> Option>> { - if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes { - self.beacon_chain.early_attester_cache.get_block(root) + fn check_caches(&self, root: Hash256) -> Option>> { + if self.check_caches == CheckCaches::Yes { + self.beacon_chain + .data_availability_checker + .get_block(&root) + .or(self.beacon_chain.early_attester_cache.get_block(root)) } else { None } @@ -422,10 +422,7 @@ impl BeaconBlockStreamer { let mut db_blocks = Vec::new(); for root in block_roots { - if let Some(cached_block) = self - .check_early_attester_cache(root) - .map(LoadedBeaconBlock::Full) - { + if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) { db_blocks.push((root, Ok(Some(cached_block)))); continue; } @@ -554,7 +551,7 @@ impl BeaconBlockStreamer { "Using slower fallback method of eth_getBlockByHash()" ); for root in block_roots { - let cached_block = self.check_early_attester_cache(root); + let cached_block = self.check_caches(root); let block_result = if cached_block.is_some() { Ok(cached_block) } else { @@ -682,7 +679,7 @@ impl From for BeaconChainError { #[cfg(test)] mod tests { - use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; + use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches}; use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType}; use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES}; use execution_layer::EngineCapabilities; @@ -804,7 +801,7 @@ mod tests { let start = epoch * slots_per_epoch; let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); - let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) + let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No) .expect("should create streamer"); let (block_tx, mut block_rx) = mpsc::unbounded_channel(); streamer.stream(epoch_roots.clone(), block_tx).await; @@ -945,7 +942,7 @@ mod tests { let start = epoch * slots_per_epoch; let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); - let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) + let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No) .expect("should create streamer"); let (block_tx, mut block_rx) = mpsc::unbounded_channel(); streamer.stream(epoch_roots.clone(), block_tx).await; diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b05ba01ee..20a93e31e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4,7 +4,7 @@ use crate::attestation_verification::{ VerifiedUnaggregatedAttestation, }; use crate::attester_cache::{AttesterCache, AttesterCacheKey}; -use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; +use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; @@ -1131,7 +1131,7 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - pub fn get_blocks_checking_early_attester_cache( + pub fn get_blocks_checking_caches( self: &Arc, block_roots: Vec, executor: &TaskExecutor, @@ -1144,10 +1144,8 @@ impl BeaconChain { >, Error, > { - Ok( - BeaconBlockStreamer::::new(self, CheckEarlyAttesterCache::Yes)? - .launch_stream(block_roots, executor), - ) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)? + .launch_stream(block_roots, executor)) } pub fn get_blocks( @@ -1163,10 +1161,8 @@ impl BeaconChain { >, Error, > { - Ok( - BeaconBlockStreamer::::new(self, CheckEarlyAttesterCache::No)? - .launch_stream(block_roots, executor), - ) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)? + .launch_stream(block_roots, executor)) } pub fn get_blobs_checking_early_attester_cache( @@ -2960,18 +2956,8 @@ impl BeaconChain { unverified_block: B, notify_execution_layer: NotifyExecutionLayer, ) -> Result> { - if let Ok(commitments) = unverified_block - .block() - .message() - .body() - .blob_kzg_commitments() - { - self.data_availability_checker.notify_block_commitments( - unverified_block.block().slot(), - block_root, - commitments.clone(), - ); - }; + self.data_availability_checker + .notify_block(block_root, unverified_block.block_cloned()); let r = self .process_block(block_root, unverified_block, notify_execution_layer, || { Ok(()) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index e8df5b811..ac3d3e3ab 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -764,6 +764,7 @@ pub trait IntoExecutionPendingBlock: Sized { ) -> Result, BlockSlashInfo>>; fn block(&self) -> &SignedBeaconBlock; + fn block_cloned(&self) -> Arc>; } impl GossipVerifiedBlock { @@ -1017,6 +1018,10 @@ impl IntoExecutionPendingBlock for GossipVerifiedBlock &SignedBeaconBlock { self.block.as_block() } + + fn block_cloned(&self) -> Arc> { + self.block.clone() + } } impl SignatureVerifiedBlock { @@ -1168,6 +1173,10 @@ impl IntoExecutionPendingBlock for SignatureVerifiedBloc fn block(&self) -> &SignedBeaconBlock { self.block.as_block() } + + fn block_cloned(&self) -> Arc> { + self.block.block_cloned() + } } impl IntoExecutionPendingBlock for Arc> { @@ -1198,6 +1207,10 @@ impl IntoExecutionPendingBlock for Arc &SignedBeaconBlock { self } + + fn block_cloned(&self) -> Arc> { + self.clone() + } } impl IntoExecutionPendingBlock for RpcBlock { @@ -1228,6 +1241,10 @@ impl IntoExecutionPendingBlock for RpcBlock fn block(&self) -> &SignedBeaconBlock { self.as_block() } + + fn block_cloned(&self) -> Arc> { + self.block_cloned() + } } impl ExecutionPendingBlock { diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index a6840ed76..edba7a211 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -46,6 +46,13 @@ impl RpcBlock { } } + pub fn block_cloned(&self) -> Arc> { + match &self.block { + RpcBlockInner::Block(block) => block.clone(), + RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), + } + } + pub fn blobs(&self) -> Option<&BlobSidecarList> { match &self.block { RpcBlockInner::Block(_) => None, diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 48d505e9e..f906032ec 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -20,7 +20,7 @@ use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; use task_executor::TaskExecutor; -use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments}; +use types::beacon_block_body::KzgCommitmentOpts; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; @@ -192,6 +192,14 @@ impl DataAvailabilityChecker { self.availability_cache.peek_blob(blob_id) } + /// Get a block from the availability cache. Includes any blocks we are currently processing. + pub fn get_block(&self, block_root: &Hash256) -> Option>> { + self.processing_cache + .read() + .get(block_root) + .and_then(|cached| cached.block.clone()) + } + /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. pub fn put_rpc_blobs( @@ -344,20 +352,16 @@ impl DataAvailabilityChecker { block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch()) } - /// Adds block commitments to the processing cache. These commitments are unverified but caching + /// Adds a block to the processing cache. This block's commitments are unverified but caching /// them here is useful to avoid duplicate downloads of blocks, as well as understanding - /// our blob download requirements. - pub fn notify_block_commitments( - &self, - slot: Slot, - block_root: Hash256, - commitments: KzgCommitments, - ) { + /// our blob download requirements. We will also serve this over RPC. + pub fn notify_block(&self, block_root: Hash256, block: Arc>) { + let slot = block.slot(); self.processing_cache .write() .entry(block_root) .or_insert_with(|| ProcessingComponents::new(slot)) - .merge_block(commitments); + .merge_block(block); } /// Add a single blob commitment to the processing cache. This commitment is unverified but caching @@ -450,6 +454,24 @@ impl DataAvailabilityChecker { pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> { self.availability_cache.write_all_to_disk() } + + /// Collects metrics from the data availability checker. + pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { + DataAvailabilityCheckerMetrics { + processing_cache_size: self.processing_cache.read().len(), + num_store_entries: self.availability_cache.num_store_entries(), + state_cache_size: self.availability_cache.state_cache_size(), + block_cache_size: self.availability_cache.block_cache_size(), + } + } +} + +/// Helper struct to group data availability checker metrics. +pub struct DataAvailabilityCheckerMetrics { + pub processing_cache_size: usize, + pub num_store_entries: usize, + pub state_cache_size: usize, + pub block_cache_size: usize, } pub fn start_availability_cache_maintenance_service( @@ -597,6 +619,15 @@ pub enum MaybeAvailableBlock { }, } +impl MaybeAvailableBlock { + pub fn block_cloned(&self) -> Arc> { + match self { + Self::Available(block) => block.block_cloned(), + Self::AvailabilityPending { block, .. } => block.clone(), + } + } +} + #[derive(Debug, Clone)] pub enum MissingBlobs { /// We know for certain these blobs are missing. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs index 776f81ee5..65093db26 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs @@ -182,9 +182,9 @@ macro_rules! impl_availability_view { impl_availability_view!( ProcessingComponents, - KzgCommitments, + Arc>, KzgCommitment, - block_commitments, + block, blob_commitments ); @@ -212,12 +212,6 @@ pub trait GetCommitment { fn get_commitment(&self) -> &KzgCommitment; } -// These implementations are required to implement `AvailabilityView` for `ProcessingView`. -impl GetCommitments for KzgCommitments { - fn get_commitments(&self) -> KzgCommitments { - self.clone() - } -} impl GetCommitment for KzgCommitment { fn get_commitment(&self) -> &KzgCommitment { self @@ -310,7 +304,7 @@ pub mod tests { } type ProcessingViewSetup = ( - KzgCommitments, + Arc>, FixedVector, ::MaxBlobsPerBlock>, FixedVector, ::MaxBlobsPerBlock>, ); @@ -320,12 +314,6 @@ pub mod tests { valid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, invalid_blobs: FixedVector>>, ::MaxBlobsPerBlock>, ) -> ProcessingViewSetup { - let commitments = block - .message() - .body() - .blob_kzg_commitments() - .unwrap() - .clone(); let blobs = FixedVector::from( valid_blobs .iter() @@ -338,7 +326,7 @@ pub mod tests { .map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment)) .collect::>(), ); - (commitments, blobs, invalid_blobs) + (Arc::new(block), blobs, invalid_blobs) } type PendingComponentsSetup = ( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 34c9bc76f..80cbc6c89 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -363,6 +363,16 @@ impl Critical { } } } + + /// Returns the number of pending component entries in memory. + pub fn num_blocks(&self) -> usize { + self.in_memory.len() + } + + /// Returns the number of entries that have overflowed to disk. + pub fn num_store_entries(&self) -> usize { + self.store_keys.len() + } } /// This is the main struct for this module. Outside methods should @@ -671,6 +681,21 @@ impl OverflowLRUCache { pub fn state_lru_cache(&self) -> &StateLRUCache { &self.state_cache } + + /// Number of states stored in memory in the cache. + pub fn state_cache_size(&self) -> usize { + self.state_cache.lru_cache().read().len() + } + + /// Number of pending component entries in memory in the cache. + pub fn block_cache_size(&self) -> usize { + self.critical.read().num_blocks() + } + + /// Returns the number of entries in the cache that have overflowed to disk. + pub fn num_store_entries(&self) -> usize { + self.critical.read().num_store_entries() + } } impl ssz::Encode for OverflowKey { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs index 969034c65..af94803dc 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs @@ -1,8 +1,9 @@ use crate::data_availability_checker::AvailabilityView; use std::collections::hash_map::Entry; use std::collections::HashMap; -use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments}; -use types::{EthSpec, Hash256, Slot}; +use std::sync::Arc; +use types::beacon_block_body::KzgCommitmentOpts; +use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; /// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp /// a view of what we have and what we require. This cache serves a slightly different purpose than @@ -37,6 +38,9 @@ impl ProcessingCache { } roots_missing_components } + pub fn len(&self) -> usize { + self.processing_cache.len() + } } #[derive(Debug, Clone)] @@ -45,7 +49,7 @@ pub struct ProcessingComponents { /// Blobs required for a block can only be known if we have seen the block. So `Some` here /// means we've seen it, a `None` means we haven't. The `kzg_commitments` value helps us figure /// out whether incoming blobs actually match the block. - pub block_commitments: Option>, + pub block: Option>>, /// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See /// `AvailabilityView`'s trait definition for more details. pub blob_commitments: KzgCommitmentOpts, @@ -55,7 +59,7 @@ impl ProcessingComponents { pub fn new(slot: Slot) -> Self { Self { slot, - block_commitments: None, + block: None, blob_commitments: KzgCommitmentOpts::::default(), } } @@ -67,7 +71,7 @@ impl ProcessingComponents { pub fn empty(_block_root: Hash256) -> Self { Self { slot: Slot::new(0), - block_commitments: None, + block: None, blob_commitments: KzgCommitmentOpts::::default(), } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index bd125a7f4..35c114db5 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -190,8 +190,7 @@ impl StateLRUCache { }) } - /// returns the state cache for inspection in tests - #[cfg(test)] + /// returns the state cache for inspection pub fn lru_cache(&self) -> &RwLock>> { &self.states } diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 24c05e01f..abac2c80e 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1129,6 +1129,31 @@ lazy_static! { Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) ); + + /* + * Data Availability cache metrics + */ + pub static ref DATA_AVAILABILITY_PROCESSING_CACHE_SIZE: Result = + try_create_int_gauge( + "data_availability_processing_cache_size", + "Number of entries in the data availability processing cache." + ); + pub static ref DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE: Result = + try_create_int_gauge( + "data_availability_overflow_memory_block_cache_size", + "Number of entries in the data availability overflow block memory cache." + ); + pub static ref DATA_AVAILABILITY_OVERFLOW_MEMORY_STATE_CACHE_SIZE: Result = + try_create_int_gauge( + "data_availability_overflow_memory_state_cache_size", + "Number of entries in the data availability overflow state memory cache." + ); + pub static ref DATA_AVAILABILITY_OVERFLOW_STORE_CACHE_SIZE: Result = + try_create_int_gauge( + "data_availability_overflow_store_cache_size", + "Number of entries in the data availability overflow store cache." + ); + /* * light_client server metrics */ @@ -1171,6 +1196,24 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { ) } + let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); + set_gauge_by_usize( + &DATA_AVAILABILITY_PROCESSING_CACHE_SIZE, + da_checker_metrics.processing_cache_size, + ); + set_gauge_by_usize( + &DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE, + da_checker_metrics.block_cache_size, + ); + set_gauge_by_usize( + &DATA_AVAILABILITY_OVERFLOW_MEMORY_STATE_CACHE_SIZE, + da_checker_metrics.state_cache_size, + ); + set_gauge_by_usize( + &DATA_AVAILABILITY_OVERFLOW_STORE_CACHE_SIZE, + da_checker_metrics.num_store_entries, + ); + if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() { set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size); set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_LOOKUP_COUNT, num_lookups); diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index a731dea7c..66c98ff3b 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -140,7 +140,7 @@ impl NetworkBeaconProcessor { let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain - .get_blocks_checking_early_attester_cache(request.block_roots().to_vec(), &executor) + .get_blocks_checking_caches(request.block_roots().to_vec(), &executor) { Ok(block_stream) => block_stream, Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),