Add simple pruning to data availability checker (#4132)

* use block wrapper in sync pairing

* add pruning to the data availability checker

* remove unused function, rename function
This commit is contained in:
realbigsean 2023-03-27 10:09:53 -04:00 committed by GitHub
parent af974dc0b8
commit f580863337
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 120 additions and 65 deletions

View File

@ -93,6 +93,7 @@ use std::time::Duration;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use task_executor::JoinHandle; use task_executor::JoinHandle;
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::ExecPayload; use types::ExecPayload;
use types::{ use types::{
BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch,
@ -735,6 +736,23 @@ impl<E: EthSpec> AvailableExecutedBlock<E> {
payload_verification_outcome, payload_verification_outcome,
} }
} }
pub fn get_all_blob_ids(&self) -> Vec<BlobIdentifier> {
let num_blobs_expected = self
.block
.message()
.body()
.blob_kzg_commitments()
.map_or(0, |commitments| commitments.len());
let mut blob_ids = Vec::with_capacity(num_blobs_expected);
for i in 0..num_blobs_expected {
blob_ids.push(BlobIdentifier {
block_root: self.import_data.block_root,
index: i as u64,
});
}
blob_ids
}
} }
pub struct AvailabilityPendingExecutedBlock<E: EthSpec> { pub struct AvailabilityPendingExecutedBlock<E: EthSpec> {
@ -755,6 +773,30 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
payload_verification_outcome, payload_verification_outcome,
} }
} }
pub fn num_blobs_expected(&self) -> usize {
self.block
.kzg_commitments()
.map_or(0, |commitments| commitments.len())
}
pub fn get_all_blob_ids(&self) -> Vec<BlobIdentifier> {
self.get_filtered_blob_ids(|_| true)
}
pub fn get_filtered_blob_ids(&self, filter: impl Fn(usize) -> bool) -> Vec<BlobIdentifier> {
let num_blobs_expected = self.num_blobs_expected();
let mut blob_ids = Vec::with_capacity(num_blobs_expected);
for i in 0..num_blobs_expected {
if filter(i) {
blob_ids.push(BlobIdentifier {
block_root: self.import_data.block_root,
index: i as u64,
});
}
}
blob_ids
}
} }
pub struct BlockImportData<E: EthSpec> { pub struct BlockImportData<E: EthSpec> {

View File

@ -10,7 +10,7 @@ use parking_lot::{Mutex, RwLock};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz_types::{Error, VariableList}; use ssz_types::{Error, VariableList};
use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions;
use std::collections::hash_map::Entry; use std::collections::hash_map::{Entry, OccupiedEntry};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use types::beacon_block_body::KzgCommitments; use types::beacon_block_body::KzgCommitments;
@ -65,12 +65,42 @@ struct GossipBlobCache<T: EthSpec> {
executed_block: Option<AvailabilityPendingExecutedBlock<T>>, executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
} }
impl<T: EthSpec> GossipBlobCache<T> {
fn new_from_blob(blob: KzgVerifiedBlob<T>) -> Self {
Self {
verified_blobs: vec![blob],
executed_block: None,
}
}
fn new_from_block(block: AvailabilityPendingExecutedBlock<T>) -> Self {
Self {
verified_blobs: vec![],
executed_block: Some(block),
}
}
fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock<T>) -> bool {
self.verified_blobs.len() == block.num_blobs_expected()
}
}
pub enum Availability<T: EthSpec> { pub enum Availability<T: EthSpec> {
PendingBlobs(Vec<BlobIdentifier>), PendingBlobs(Vec<BlobIdentifier>),
PendingBlock(Hash256), PendingBlock(Hash256),
Available(Box<AvailableExecutedBlock<T>>), Available(Box<AvailableExecutedBlock<T>>),
} }
impl<T: EthSpec> Availability<T> {
pub fn get_available_blob_ids(&self) -> Option<Vec<BlobIdentifier>> {
if let Self::Available(block) = self {
Some(block.get_all_blob_ids())
} else {
None
}
}
}
impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> { impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
pub fn new(slot_clock: S, kzg: Option<Arc<Kzg>>, spec: ChainSpec) -> Self { pub fn new(slot_clock: S, kzg: Option<Arc<Kzg>>, spec: ChainSpec) -> Self {
Self { Self {
@ -90,23 +120,19 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
/// This should only accept gossip verified blobs, so we should not have to worry about dupes. /// This should only accept gossip verified blobs, so we should not have to worry about dupes.
pub fn put_gossip_blob( pub fn put_gossip_blob(
&self, &self,
verified_blob: GossipVerifiedBlob<T>, gossip_blob: GossipVerifiedBlob<T>,
) -> Result<Availability<T>, AvailabilityCheckError> { ) -> Result<Availability<T>, AvailabilityCheckError> {
let block_root = verified_blob.block_root(); let block_root = gossip_blob.block_root();
// Verify the KZG commitments.
let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() { let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() {
verify_kzg_for_blob(verified_blob, kzg)? verify_kzg_for_blob(gossip_blob, kzg)?
} else { } else {
return Err(AvailabilityCheckError::KzgNotInitialized); return Err(AvailabilityCheckError::KzgNotInitialized);
}; };
//TODO(sean) can we just use a referece to the blob here?
let blob = kzg_verified_blob.clone_blob(); let blob = kzg_verified_blob.clone_blob();
// check if we have a block
// check if the complete set matches the block
// verify, otherwise cache
let mut blob_cache = self.gossip_blob_cache.lock(); let mut blob_cache = self.gossip_blob_cache.lock();
// Gossip cache. // Gossip cache.
@ -121,25 +147,25 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
.insert(blob.index as usize, kzg_verified_blob); .insert(blob.index as usize, kzg_verified_blob);
if let Some(executed_block) = cache.executed_block.take() { if let Some(executed_block) = cache.executed_block.take() {
self.check_block_availability_or_cache(cache, executed_block)? self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
} else { } else {
Availability::PendingBlock(block_root) Availability::PendingBlock(block_root)
} }
} }
Entry::Vacant(vacant_entry) => { Entry::Vacant(vacant_entry) => {
let block_root = kzg_verified_blob.block_root(); let block_root = kzg_verified_blob.block_root();
vacant_entry.insert(GossipBlobCache { vacant_entry.insert(GossipBlobCache::new_from_blob(kzg_verified_blob));
verified_blobs: vec![kzg_verified_blob],
executed_block: None,
});
Availability::PendingBlock(block_root) Availability::PendingBlock(block_root)
} }
}; };
drop(blob_cache); drop(blob_cache);
// RPC cache. if let Some(blob_ids) = availability.get_available_blob_ids() {
self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); self.prune_rpc_blob_cache(&blob_ids);
} else {
self.rpc_blob_cache.write().insert(blob.id(), blob.clone());
}
Ok(availability) Ok(availability)
} }
@ -154,49 +180,40 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
let entry = guard.entry(executed_block.import_data.block_root); let entry = guard.entry(executed_block.import_data.block_root);
let availability = match entry { let availability = match entry {
Entry::Occupied(mut occupied_entry) => { Entry::Occupied(occupied_entry) => {
let cache: &mut GossipBlobCache<T> = occupied_entry.get_mut(); self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
self.check_block_availability_or_cache(cache, executed_block)?
} }
Entry::Vacant(vacant_entry) => { Entry::Vacant(vacant_entry) => {
let kzg_commitments_len = executed_block.block.kzg_commitments()?.len(); let all_blob_ids = executed_block.get_all_blob_ids();
let mut blob_ids = Vec::with_capacity(kzg_commitments_len); vacant_entry.insert(GossipBlobCache::new_from_block(executed_block));
for i in 0..kzg_commitments_len { Availability::PendingBlobs(all_blob_ids)
blob_ids.push(BlobIdentifier {
block_root: executed_block.import_data.block_root,
index: i as u64,
});
}
vacant_entry.insert(GossipBlobCache {
verified_blobs: vec![],
executed_block: Some(executed_block),
});
Availability::PendingBlobs(blob_ids)
} }
}; };
drop(guard);
if let Some(blob_ids) = availability.get_available_blob_ids() {
self.prune_rpc_blob_cache(&blob_ids);
}
Ok(availability) Ok(availability)
} }
fn check_block_availability_or_cache( fn check_block_availability_maybe_cache(
&self, &self,
cache: &mut GossipBlobCache<T>, mut occupied_entry: OccupiedEntry<Hash256, GossipBlobCache<T>>,
executed_block: AvailabilityPendingExecutedBlock<T>, executed_block: AvailabilityPendingExecutedBlock<T>,
) -> Result<Availability<T>, AvailabilityCheckError> { ) -> Result<Availability<T>, AvailabilityCheckError> {
let AvailabilityPendingExecutedBlock { if occupied_entry.get().has_all_blobs(&executed_block) {
block, let AvailabilityPendingExecutedBlock {
import_data, block,
payload_verification_outcome, import_data,
} = executed_block; payload_verification_outcome,
let kzg_commitments_len = block.kzg_commitments()?.len(); } = executed_block;
let verified_commitments_len = cache.verified_blobs.len();
if kzg_commitments_len == verified_commitments_len { let cache = occupied_entry.remove();
//TODO(sean) can we remove this clone
let blobs = cache.verified_blobs.clone(); let available_block = self.make_available(block, cache.verified_blobs)?;
let available_block = self.make_available(block, blobs)?;
Ok(Availability::Available(Box::new( Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new( AvailableExecutedBlock::new(
available_block, available_block,
@ -205,25 +222,14 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
), ),
))) )))
} else { } else {
let mut missing_blobs = Vec::with_capacity(kzg_commitments_len); let cache = occupied_entry.get_mut();
for i in 0..kzg_commitments_len {
if cache.verified_blobs.get(i).is_none() {
missing_blobs.push(BlobIdentifier {
block_root: import_data.block_root,
index: i as u64,
})
}
}
let _ = cache let missing_blob_ids = executed_block
.executed_block .get_filtered_blob_ids(|index| cache.verified_blobs.get(index).is_none());
.insert(AvailabilityPendingExecutedBlock::new(
block,
import_data,
payload_verification_outcome,
));
Ok(Availability::PendingBlobs(missing_blobs)) let _ = cache.executed_block.insert(executed_block);
Ok(Availability::PendingBlobs(missing_blob_ids))
} }
} }
@ -397,6 +403,13 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
self.data_availability_boundary() self.data_availability_boundary()
.map_or(false, |da_epoch| block_epoch >= da_epoch) .map_or(false, |da_epoch| block_epoch >= da_epoch)
} }
pub fn prune_rpc_blob_cache(&self, blob_ids: &[BlobIdentifier]) {
let mut guard = self.rpc_blob_cache.write();
for id in blob_ids {
guard.remove(id);
}
}
} }
pub enum BlobRequirements { pub enum BlobRequirements {