Unified Availability Cache into One (#4161)
* Unified Availability Cache into One * Update beacon_node/beacon_chain/src/data_availability_checker.rs Co-authored-by: Jimmy Chen <jchen.tc@gmail.com> --------- Co-authored-by: realbigsean <seananderson33@GMAIL.com> Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
parent
ffefd20137
commit
3a21317600
@ -6,12 +6,12 @@ use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecu
|
|||||||
|
|
||||||
use kzg::Error as KzgError;
|
use kzg::Error as KzgError;
|
||||||
use kzg::Kzg;
|
use kzg::Kzg;
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::RwLock;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use ssz_types::{Error, VariableList};
|
use ssz_types::{Error, FixedVector, VariableList};
|
||||||
use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions;
|
use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions;
|
||||||
use std::collections::hash_map::{Entry, OccupiedEntry};
|
use std::collections::hash_map::{Entry, OccupiedEntry};
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use types::beacon_block_body::KzgCommitments;
|
use types::beacon_block_body::KzgCommitments;
|
||||||
use types::blob_sidecar::{BlobIdentifier, BlobSidecar};
|
use types::blob_sidecar::{BlobIdentifier, BlobSidecar};
|
||||||
@ -53,8 +53,7 @@ impl From<ssz_types::Error> for AvailabilityCheckError {
|
|||||||
/// have not been verified against blobs
|
/// have not been verified against blobs
|
||||||
/// - blocks that have been fully verified and only require a data availability check
|
/// - blocks that have been fully verified and only require a data availability check
|
||||||
pub struct DataAvailabilityChecker<T: EthSpec, S: SlotClock> {
|
pub struct DataAvailabilityChecker<T: EthSpec, S: SlotClock> {
|
||||||
rpc_blob_cache: RwLock<HashMap<BlobIdentifier, Arc<BlobSidecar<T>>>>,
|
availability_cache: RwLock<HashMap<Hash256, ReceivedComponents<T>>>,
|
||||||
gossip_availability_cache: Mutex<HashMap<Hash256, GossipAvailabilityCache<T>>>,
|
|
||||||
slot_clock: S,
|
slot_clock: S,
|
||||||
kzg: Option<Arc<Kzg>>,
|
kzg: Option<Arc<Kzg>>,
|
||||||
spec: ChainSpec,
|
spec: ChainSpec,
|
||||||
@ -65,16 +64,20 @@ pub struct DataAvailabilityChecker<T: EthSpec, S: SlotClock> {
|
|||||||
///
|
///
|
||||||
/// The blobs are all gossip and kzg verified.
|
/// The blobs are all gossip and kzg verified.
|
||||||
/// The block has completed all verifications except the availability check.
|
/// The block has completed all verifications except the availability check.
|
||||||
struct GossipAvailabilityCache<T: EthSpec> {
|
struct ReceivedComponents<T: EthSpec> {
|
||||||
/// We use a `BTreeMap` here to maintain the order of `BlobSidecar`s based on index.
|
/// We use a `BTreeMap` here to maintain the order of `BlobSidecar`s based on index.
|
||||||
verified_blobs: BTreeMap<u64, KzgVerifiedBlob<T>>,
|
verified_blobs: FixedVector<Option<KzgVerifiedBlob<T>>, T::MaxBlobsPerBlock>,
|
||||||
executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
|
executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: EthSpec> GossipAvailabilityCache<T> {
|
impl<T: EthSpec> ReceivedComponents<T> {
|
||||||
fn new_from_blob(blob: KzgVerifiedBlob<T>) -> Self {
|
fn new_from_blob(blob: KzgVerifiedBlob<T>) -> Self {
|
||||||
let mut verified_blobs = BTreeMap::new();
|
let mut verified_blobs = FixedVector::<_, _>::default();
|
||||||
verified_blobs.insert(blob.blob_index(), blob);
|
// TODO: verify that we've already ensured the blob index < T::MaxBlobsPerBlock
|
||||||
|
if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) {
|
||||||
|
*mut_maybe_blob = Some(blob);
|
||||||
|
}
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
verified_blobs,
|
verified_blobs,
|
||||||
executed_block: None,
|
executed_block: None,
|
||||||
@ -83,7 +86,7 @@ impl<T: EthSpec> GossipAvailabilityCache<T> {
|
|||||||
|
|
||||||
fn new_from_block(block: AvailabilityPendingExecutedBlock<T>) -> Self {
|
fn new_from_block(block: AvailabilityPendingExecutedBlock<T>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
verified_blobs: BTreeMap::new(),
|
verified_blobs: <_>::default(),
|
||||||
executed_block: Some(block),
|
executed_block: Some(block),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -91,7 +94,17 @@ impl<T: EthSpec> GossipAvailabilityCache<T> {
|
|||||||
/// Returns `true` if the cache has all blobs corresponding to the
|
/// Returns `true` if the cache has all blobs corresponding to the
|
||||||
/// kzg commitments in the block.
|
/// kzg commitments in the block.
|
||||||
fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock<T>) -> bool {
|
fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock<T>) -> bool {
|
||||||
self.verified_blobs.len() == block.num_blobs_expected()
|
for i in 0..block.num_blobs_expected() {
|
||||||
|
if self
|
||||||
|
.verified_blobs
|
||||||
|
.get(i)
|
||||||
|
.map(|maybe_blob| maybe_blob.is_none())
|
||||||
|
.unwrap_or(true)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,17 +133,22 @@ impl<T: EthSpec> Availability<T> {
|
|||||||
impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||||
pub fn new(slot_clock: S, kzg: Option<Arc<Kzg>>, spec: ChainSpec) -> Self {
|
pub fn new(slot_clock: S, kzg: Option<Arc<Kzg>>, spec: ChainSpec) -> Self {
|
||||||
Self {
|
Self {
|
||||||
rpc_blob_cache: <_>::default(),
|
availability_cache: <_>::default(),
|
||||||
gossip_availability_cache: <_>::default(),
|
|
||||||
slot_clock,
|
slot_clock,
|
||||||
kzg,
|
kzg,
|
||||||
spec,
|
spec,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a blob from the RPC cache.
|
/// Get a blob from the availability cache.
|
||||||
pub fn get_blob(&self, blob_id: &BlobIdentifier) -> Option<Arc<BlobSidecar<T>>> {
|
pub fn get_blob(&self, blob_id: &BlobIdentifier) -> Option<Arc<BlobSidecar<T>>> {
|
||||||
self.rpc_blob_cache.read().get(blob_id).cloned()
|
self.availability_cache
|
||||||
|
.read()
|
||||||
|
.get(&blob_id.block_root)?
|
||||||
|
.verified_blobs
|
||||||
|
.get(blob_id.index as usize)?
|
||||||
|
.as_ref()
|
||||||
|
.map(|kzg_verified_blob| kzg_verified_blob.clone_blob())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This first validates the KZG commitments included in the blob sidecar.
|
/// This first validates the KZG commitments included in the blob sidecar.
|
||||||
@ -152,22 +170,24 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
|||||||
return Err(AvailabilityCheckError::KzgNotInitialized);
|
return Err(AvailabilityCheckError::KzgNotInitialized);
|
||||||
};
|
};
|
||||||
|
|
||||||
let blob = kzg_verified_blob.clone_blob();
|
let availability = match self
|
||||||
|
.availability_cache
|
||||||
let mut blob_cache = self.gossip_availability_cache.lock();
|
.write()
|
||||||
|
.entry(kzg_verified_blob.block_root())
|
||||||
// Gossip cache.
|
{
|
||||||
let availability = match blob_cache.entry(blob.block_root) {
|
|
||||||
Entry::Occupied(mut occupied_entry) => {
|
Entry::Occupied(mut occupied_entry) => {
|
||||||
// All blobs reaching this cache should be gossip verified and gossip verification
|
// All blobs reaching this cache should be gossip verified and gossip verification
|
||||||
// should filter duplicates, as well as validate indices.
|
// should filter duplicates, as well as validate indices.
|
||||||
let cache = occupied_entry.get_mut();
|
let received_components = occupied_entry.get_mut();
|
||||||
|
|
||||||
cache
|
if let Some(maybe_verified_blob) = received_components
|
||||||
.verified_blobs
|
.verified_blobs
|
||||||
.insert(kzg_verified_blob.blob_index(), kzg_verified_blob);
|
.get_mut(kzg_verified_blob.blob_index() as usize)
|
||||||
|
{
|
||||||
|
*maybe_verified_blob = Some(kzg_verified_blob)
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(executed_block) = cache.executed_block.take() {
|
if let Some(executed_block) = received_components.executed_block.take() {
|
||||||
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
|
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
|
||||||
} else {
|
} else {
|
||||||
Availability::PendingBlock(block_root)
|
Availability::PendingBlock(block_root)
|
||||||
@ -175,19 +195,11 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
|||||||
}
|
}
|
||||||
Entry::Vacant(vacant_entry) => {
|
Entry::Vacant(vacant_entry) => {
|
||||||
let block_root = kzg_verified_blob.block_root();
|
let block_root = kzg_verified_blob.block_root();
|
||||||
vacant_entry.insert(GossipAvailabilityCache::new_from_blob(kzg_verified_blob));
|
vacant_entry.insert(ReceivedComponents::new_from_blob(kzg_verified_blob));
|
||||||
Availability::PendingBlock(block_root)
|
Availability::PendingBlock(block_root)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
drop(blob_cache);
|
|
||||||
|
|
||||||
if let Some(blob_ids) = availability.get_available_blob_ids() {
|
|
||||||
self.prune_rpc_blob_cache(&blob_ids);
|
|
||||||
} else {
|
|
||||||
self.rpc_blob_cache.write().insert(blob.id(), blob.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(availability)
|
Ok(availability)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,26 +209,21 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
|||||||
&self,
|
&self,
|
||||||
executed_block: AvailabilityPendingExecutedBlock<T>,
|
executed_block: AvailabilityPendingExecutedBlock<T>,
|
||||||
) -> Result<Availability<T>, AvailabilityCheckError> {
|
) -> Result<Availability<T>, AvailabilityCheckError> {
|
||||||
let mut guard = self.gossip_availability_cache.lock();
|
let availability = match self
|
||||||
let entry = guard.entry(executed_block.import_data.block_root);
|
.availability_cache
|
||||||
|
.write()
|
||||||
let availability = match entry {
|
.entry(executed_block.import_data.block_root)
|
||||||
|
{
|
||||||
Entry::Occupied(occupied_entry) => {
|
Entry::Occupied(occupied_entry) => {
|
||||||
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
|
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
|
||||||
}
|
}
|
||||||
Entry::Vacant(vacant_entry) => {
|
Entry::Vacant(vacant_entry) => {
|
||||||
let all_blob_ids = executed_block.get_all_blob_ids();
|
let all_blob_ids = executed_block.get_all_blob_ids();
|
||||||
vacant_entry.insert(GossipAvailabilityCache::new_from_block(executed_block));
|
vacant_entry.insert(ReceivedComponents::new_from_block(executed_block));
|
||||||
Availability::PendingBlobs(all_blob_ids)
|
Availability::PendingBlobs(all_blob_ids)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
drop(guard);
|
|
||||||
|
|
||||||
if let Some(blob_ids) = availability.get_available_blob_ids() {
|
|
||||||
self.prune_rpc_blob_cache(&blob_ids);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(availability)
|
Ok(availability)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,21 +236,27 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
|||||||
/// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache.
|
/// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache.
|
||||||
fn check_block_availability_maybe_cache(
|
fn check_block_availability_maybe_cache(
|
||||||
&self,
|
&self,
|
||||||
mut occupied_entry: OccupiedEntry<Hash256, GossipAvailabilityCache<T>>,
|
mut occupied_entry: OccupiedEntry<Hash256, ReceivedComponents<T>>,
|
||||||
executed_block: AvailabilityPendingExecutedBlock<T>,
|
executed_block: AvailabilityPendingExecutedBlock<T>,
|
||||||
) -> Result<Availability<T>, AvailabilityCheckError> {
|
) -> Result<Availability<T>, AvailabilityCheckError> {
|
||||||
if occupied_entry.get().has_all_blobs(&executed_block) {
|
if occupied_entry.get().has_all_blobs(&executed_block) {
|
||||||
|
let num_blobs_expected = executed_block.num_blobs_expected();
|
||||||
let AvailabilityPendingExecutedBlock {
|
let AvailabilityPendingExecutedBlock {
|
||||||
block,
|
block,
|
||||||
import_data,
|
import_data,
|
||||||
payload_verification_outcome,
|
payload_verification_outcome,
|
||||||
} = executed_block;
|
} = executed_block;
|
||||||
|
|
||||||
let GossipAvailabilityCache {
|
let ReceivedComponents {
|
||||||
verified_blobs,
|
verified_blobs,
|
||||||
executed_block: _,
|
executed_block: _,
|
||||||
} = occupied_entry.remove();
|
} = occupied_entry.remove();
|
||||||
let verified_blobs = verified_blobs.into_values().collect();
|
|
||||||
|
let verified_blobs = Vec::from(verified_blobs)
|
||||||
|
.into_iter()
|
||||||
|
.take(num_blobs_expected)
|
||||||
|
.map(|maybe_blob| maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs))
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
let available_block = self.make_available(block, verified_blobs)?;
|
let available_block = self.make_available(block, verified_blobs)?;
|
||||||
Ok(Availability::Available(Box::new(
|
Ok(Availability::Available(Box::new(
|
||||||
@ -254,12 +267,17 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
|||||||
),
|
),
|
||||||
)))
|
)))
|
||||||
} else {
|
} else {
|
||||||
let cached_entry = occupied_entry.get_mut();
|
let received_components = occupied_entry.get_mut();
|
||||||
|
|
||||||
let missing_blob_ids = executed_block
|
let missing_blob_ids = executed_block.get_filtered_blob_ids(|index| {
|
||||||
.get_filtered_blob_ids(|index| cached_entry.verified_blobs.get(&index).is_none());
|
received_components
|
||||||
|
.verified_blobs
|
||||||
|
.get(index as usize)
|
||||||
|
.map(|maybe_blob| maybe_blob.is_none())
|
||||||
|
.unwrap_or(true)
|
||||||
|
});
|
||||||
|
|
||||||
let _ = cached_entry.executed_block.insert(executed_block);
|
let _ = received_components.executed_block.insert(executed_block);
|
||||||
|
|
||||||
Ok(Availability::PendingBlobs(missing_blob_ids))
|
Ok(Availability::PendingBlobs(missing_blob_ids))
|
||||||
}
|
}
|
||||||
@ -435,13 +453,6 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
|||||||
self.data_availability_boundary()
|
self.data_availability_boundary()
|
||||||
.map_or(false, |da_epoch| block_epoch >= da_epoch)
|
.map_or(false, |da_epoch| block_epoch >= da_epoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn prune_rpc_blob_cache(&self, blob_ids: &[BlobIdentifier]) {
|
|
||||||
let mut guard = self.rpc_blob_cache.write();
|
|
||||||
for id in blob_ids {
|
|
||||||
guard.remove(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum BlobRequirements {
|
pub enum BlobRequirements {
|
||||||
|
Loading…
Reference in New Issue
Block a user