From f58086333710f48ecec5b5f26af64476fa7e3a7e Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 27 Mar 2023 10:09:53 -0400 Subject: [PATCH] Add simple pruning to data availability checker (#4132) * use block wrapper in sync pairing * add pruning to the data availability checker * remove unused function, rename function --- .../beacon_chain/src/block_verification.rs | 42 +++++ .../src/data_availability_checker.rs | 143 ++++++++++-------- 2 files changed, 120 insertions(+), 65 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 4cadc86bd..7c1371468 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -93,6 +93,7 @@ use std::time::Duration; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; +use types::blob_sidecar::BlobIdentifier; use types::ExecPayload; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, @@ -735,6 +736,23 @@ impl AvailableExecutedBlock { payload_verification_outcome, } } + + pub fn get_all_blob_ids(&self) -> Vec { + let num_blobs_expected = self + .block + .message() + .body() + .blob_kzg_commitments() + .map_or(0, |commitments| commitments.len()); + let mut blob_ids = Vec::with_capacity(num_blobs_expected); + for i in 0..num_blobs_expected { + blob_ids.push(BlobIdentifier { + block_root: self.import_data.block_root, + index: i as u64, + }); + } + blob_ids + } } pub struct AvailabilityPendingExecutedBlock { @@ -755,6 +773,30 @@ impl AvailabilityPendingExecutedBlock { payload_verification_outcome, } } + + pub fn num_blobs_expected(&self) -> usize { + self.block + .kzg_commitments() + .map_or(0, |commitments| commitments.len()) + } + + pub fn get_all_blob_ids(&self) -> Vec { + self.get_filtered_blob_ids(|_| true) + } + + pub fn get_filtered_blob_ids(&self, filter: impl Fn(usize) -> bool) -> Vec { + let num_blobs_expected = self.num_blobs_expected(); + let mut blob_ids = Vec::with_capacity(num_blobs_expected); + for i in 0..num_blobs_expected { + if filter(i) { + blob_ids.push(BlobIdentifier { + block_root: self.import_data.block_root, + index: i as u64, + }); + } + } + blob_ids + } } pub struct BlockImportData { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index b2e2e609d..3046c8b39 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -10,7 +10,7 @@ use parking_lot::{Mutex, RwLock}; use slot_clock::SlotClock; use ssz_types::{Error, VariableList}; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; -use std::collections::hash_map::Entry; +use std::collections::hash_map::{Entry, OccupiedEntry}; use std::collections::HashMap; use std::sync::Arc; use types::beacon_block_body::KzgCommitments; @@ -65,12 +65,42 @@ struct GossipBlobCache { executed_block: Option>, } +impl GossipBlobCache { + fn new_from_blob(blob: KzgVerifiedBlob) -> Self { + Self { + verified_blobs: vec![blob], + executed_block: None, + } + } + + fn new_from_block(block: AvailabilityPendingExecutedBlock) -> Self { + Self { + verified_blobs: vec![], + executed_block: Some(block), + } + } + + fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock) -> bool { + self.verified_blobs.len() == block.num_blobs_expected() + } +} + pub enum Availability { PendingBlobs(Vec), PendingBlock(Hash256), Available(Box>), } +impl Availability { + pub fn get_available_blob_ids(&self) -> Option> { + if let Self::Available(block) = self { + Some(block.get_all_blob_ids()) + } else { + None + } + } +} + impl DataAvailabilityChecker { pub fn new(slot_clock: S, kzg: Option>, spec: ChainSpec) -> Self { Self { @@ -90,23 +120,19 @@ impl DataAvailabilityChecker { /// This should only accept gossip verified blobs, so we should not have to worry about dupes. pub fn put_gossip_blob( &self, - verified_blob: GossipVerifiedBlob, + gossip_blob: GossipVerifiedBlob, ) -> Result, AvailabilityCheckError> { - let block_root = verified_blob.block_root(); + let block_root = gossip_blob.block_root(); + // Verify the KZG commitments. let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() { - verify_kzg_for_blob(verified_blob, kzg)? + verify_kzg_for_blob(gossip_blob, kzg)? } else { return Err(AvailabilityCheckError::KzgNotInitialized); }; - //TODO(sean) can we just use a referece to the blob here? let blob = kzg_verified_blob.clone_blob(); - // check if we have a block - // check if the complete set matches the block - // verify, otherwise cache - let mut blob_cache = self.gossip_blob_cache.lock(); // Gossip cache. @@ -121,25 +147,25 @@ impl DataAvailabilityChecker { .insert(blob.index as usize, kzg_verified_blob); if let Some(executed_block) = cache.executed_block.take() { - self.check_block_availability_or_cache(cache, executed_block)? + self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } else { Availability::PendingBlock(block_root) } } Entry::Vacant(vacant_entry) => { let block_root = kzg_verified_blob.block_root(); - vacant_entry.insert(GossipBlobCache { - verified_blobs: vec![kzg_verified_blob], - executed_block: None, - }); + vacant_entry.insert(GossipBlobCache::new_from_blob(kzg_verified_blob)); Availability::PendingBlock(block_root) } }; drop(blob_cache); - // RPC cache. - self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); + if let Some(blob_ids) = availability.get_available_blob_ids() { + self.prune_rpc_blob_cache(&blob_ids); + } else { + self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); + } Ok(availability) } @@ -154,49 +180,40 @@ impl DataAvailabilityChecker { let entry = guard.entry(executed_block.import_data.block_root); let availability = match entry { - Entry::Occupied(mut occupied_entry) => { - let cache: &mut GossipBlobCache = occupied_entry.get_mut(); - - self.check_block_availability_or_cache(cache, executed_block)? + Entry::Occupied(occupied_entry) => { + self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } Entry::Vacant(vacant_entry) => { - let kzg_commitments_len = executed_block.block.kzg_commitments()?.len(); - let mut blob_ids = Vec::with_capacity(kzg_commitments_len); - for i in 0..kzg_commitments_len { - blob_ids.push(BlobIdentifier { - block_root: executed_block.import_data.block_root, - index: i as u64, - }); - } - - vacant_entry.insert(GossipBlobCache { - verified_blobs: vec![], - executed_block: Some(executed_block), - }); - - Availability::PendingBlobs(blob_ids) + let all_blob_ids = executed_block.get_all_blob_ids(); + vacant_entry.insert(GossipBlobCache::new_from_block(executed_block)); + Availability::PendingBlobs(all_blob_ids) } }; + drop(guard); + + if let Some(blob_ids) = availability.get_available_blob_ids() { + self.prune_rpc_blob_cache(&blob_ids); + } + Ok(availability) } - fn check_block_availability_or_cache( + fn check_block_availability_maybe_cache( &self, - cache: &mut GossipBlobCache, + mut occupied_entry: OccupiedEntry>, executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { - let AvailabilityPendingExecutedBlock { - block, - import_data, - payload_verification_outcome, - } = executed_block; - let kzg_commitments_len = block.kzg_commitments()?.len(); - let verified_commitments_len = cache.verified_blobs.len(); - if kzg_commitments_len == verified_commitments_len { - //TODO(sean) can we remove this clone - let blobs = cache.verified_blobs.clone(); - let available_block = self.make_available(block, blobs)?; + if occupied_entry.get().has_all_blobs(&executed_block) { + let AvailabilityPendingExecutedBlock { + block, + import_data, + payload_verification_outcome, + } = executed_block; + + let cache = occupied_entry.remove(); + + let available_block = self.make_available(block, cache.verified_blobs)?; Ok(Availability::Available(Box::new( AvailableExecutedBlock::new( available_block, @@ -205,25 +222,14 @@ impl DataAvailabilityChecker { ), ))) } else { - let mut missing_blobs = Vec::with_capacity(kzg_commitments_len); - for i in 0..kzg_commitments_len { - if cache.verified_blobs.get(i).is_none() { - missing_blobs.push(BlobIdentifier { - block_root: import_data.block_root, - index: i as u64, - }) - } - } + let cache = occupied_entry.get_mut(); - let _ = cache - .executed_block - .insert(AvailabilityPendingExecutedBlock::new( - block, - import_data, - payload_verification_outcome, - )); + let missing_blob_ids = executed_block + .get_filtered_blob_ids(|index| cache.verified_blobs.get(index).is_none()); - Ok(Availability::PendingBlobs(missing_blobs)) + let _ = cache.executed_block.insert(executed_block); + + Ok(Availability::PendingBlobs(missing_blob_ids)) } } @@ -397,6 +403,13 @@ impl DataAvailabilityChecker { self.data_availability_boundary() .map_or(false, |da_epoch| block_epoch >= da_epoch) } + + pub fn prune_rpc_blob_cache(&self, blob_ids: &[BlobIdentifier]) { + let mut guard = self.rpc_blob_cache.write(); + for id in blob_ids { + guard.remove(id); + } + } } pub enum BlobRequirements {