Update Blob Storage Structure (#4104)

* Initial Changes to Blob Storage

* Add Arc to SignedBlobSidecar Definition
This commit is contained in:
ethDreamer 2023-03-21 14:33:06 -05:00 committed by GitHub
parent b40dceaae9
commit d1e653cfdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 86 additions and 124 deletions

View File

@ -959,35 +959,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.get_block(block_root).await?.map(Arc::new))
}
pub async fn get_block_and_blobs_checking_early_attester_cache(
pub async fn get_blobs_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
// If there is no data availability boundary, the Eip4844 fork is disabled.
if let Some(finalized_data_availability_boundary) =
self.finalized_data_availability_boundary()
{
// Only use the attester cache if we can find both the block and blob
if let (Some(block), Some(blobs)) = (
self.early_attester_cache.get_block(*block_root),
self.early_attester_cache.get_blobs(*block_root),
) {
Ok(Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
// Attempt to get the block and blobs from the database
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
let blobs = self
.get_blobs(block_root, finalized_data_availability_boundary)?
.map(Arc::new);
Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
} else {
Ok(None)
}
self.early_attester_cache
.get_blobs(*block_root)
.map_or_else(
|| self.get_blobs(block_root, finalized_data_availability_boundary),
|blobs| Ok(Some(blobs)),
)
} else {
Ok(None)
}
@ -1057,23 +1042,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(Some)
}
// FIXME(jimmy): temporary method added to unblock API work. This method will be replaced by
// the `get_blobs` method below once the new blob sidecar structure (`BlobSidecarList`) is
// implemented in that method.
#[allow(clippy::type_complexity)] // FIXME: this will be fixed by the `BlobSidecarList` alias in Sean's PR
pub fn get_blob_sidecar_list(
&self,
_block_root: &Hash256,
_data_availability_boundary: Epoch,
) -> Result<
Option<
VariableList<Arc<BlobSidecar<T::EthSpec>>, <T::EthSpec as EthSpec>::MaxBlobsPerBlock>,
>,
Error,
> {
unimplemented!("update to use the updated `get_blobs` method instead once this PR is merged: https://github.com/sigp/lighthouse/pull/4104")
}
/// Returns the blobs at the given root, if any.
///
/// Returns `Ok(None)` if the blobs and associated block are not found.
@ -1091,9 +1059,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
data_availability_boundary: Epoch,
) -> Result<Option<BlobsSidecar<T::EthSpec>>, Error> {
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
match self.store.get_blobs(block_root)? {
Some(blobs) => Ok(Some(blobs)),
Some(blob_sidecar_list) => Ok(Some(blob_sidecar_list)),
None => {
// Check for the corresponding block to understand whether we *should* have blobs.
self.get_blinded_block(block_root)?
@ -1106,7 +1074,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock),
};
if expected_kzg_commitments.is_empty() {
Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot()))
// TODO (mark): verify this
Ok(BlobSidecarList::empty())
} else if data_availability_boundary <= block.epoch() {
// We should have blobs for all blocks younger than the boundary.
Err(Error::BlobsUnavailable)
@ -3052,7 +3021,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// margin, or younger (of higher epoch number).
if block_epoch >= import_boundary {
if let Some(blobs) = blobs {
if !blobs.blobs.is_empty() {
if !blobs.is_empty() {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
@ -4814,7 +4783,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
.map_err(BlockProductionError::KzgError)?;
let blob_sidecars = VariableList::from(
let blob_sidecars = BlobSidecarList::from(
blobs
.into_iter()
.enumerate()
@ -4827,7 +4796,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get(blob_index)
.expect("KZG proof should exist for blob");
Ok(BlobSidecar {
Ok(Arc::new(BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot,
@ -4836,9 +4805,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
blob,
kzg_commitment: *kzg_commitment,
kzg_proof: *kzg_proof,
})
}))
})
.collect::<Result<Vec<BlobSidecar<T::EthSpec>>, BlockProductionError>>()?,
.collect::<Result<Vec<_>, BlockProductionError>>()?,
);
self.blob_cache.put(beacon_block_root, blob_sidecars);

View File

@ -21,7 +21,7 @@ pub struct CacheItem<E: EthSpec> {
* Values used to make the block available.
*/
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<Arc<BlobsSidecar<E>>>,
blobs: Option<BlobSidecarList<E>>,
proto_block: ProtoBlock,
}
@ -160,7 +160,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
}
/// Returns the blobs, if `block_root` matches the cached item.
pub fn get_blobs(&self, block_root: Hash256) -> Option<Arc<BlobsSidecar<E>>> {
pub fn get_blobs(&self, block_root: Hash256) -> Option<BlobSidecarList<E>> {
self.item
.read()
.as_ref()

View File

@ -1,10 +1,10 @@
use crate::{state_id::checkpoint_slot_and_execution_optimistic, ExecutionOptimistic};
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
use eth2::types::{BlockId as CoreBlockId, VariableList};
use eth2::types::BlockId as CoreBlockId;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot};
use types::{BlobSidecarList, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot};
/// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given
/// `BlockId`.
@ -216,16 +216,13 @@ impl BlockId {
pub async fn blob_sidecar_list<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
) -> Result<
VariableList<Arc<BlobSidecar<T::EthSpec>>, <T::EthSpec as EthSpec>::MaxBlobsPerBlock>,
warp::Rejection,
> {
) -> Result<BlobSidecarList<T::EthSpec>, warp::Rejection> {
let root = self.root(chain)?.0;
let Some(data_availability_boundary) = chain.data_availability_boundary() else {
return Err(warp_utils::reject::custom_not_found("Deneb fork disabled".into()));
};
match chain.get_blob_sidecar_list(&root, data_availability_boundary) {
Ok(Some(blobs)) => Ok(blobs),
match chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blob_sidecar_list)) => Ok(blob_sidecar_list),
Ok(None) => Err(warp_utils::reject::custom_not_found(format!(
"No blobs with block root {} found in the store",
root

View File

@ -12,6 +12,7 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::blob_sidecar::BlobIdentifier;
@ -225,42 +226,34 @@ impl<T: BeaconChainTypes> Worker<T> {
executor.spawn(
async move {
let requested_blobs = request.blob_ids.len();
let mut send_block_count = 0;
let mut send_blob_count = 0;
let mut send_response = true;
let mut blob_list_results = HashMap::new();
for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() {
match self
.chain
.get_block_and_blobs_checking_early_attester_cache(&root)
.await
{
Ok(Some(block_and_blobs)) => {
//
// TODO: HORRIBLE NSFW CODE AHEAD
//
let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs;
let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone();
// TODO: this should be unreachable after this is addressed seriously,
// so for now let's be ok with a panic in the expect.
let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff");
// Intentionally not accessing the list directly
for (known_index, blob) in blob_bundle.into_iter().enumerate() {
if (known_index as u64) == index {
let blob_sidecar = types::BlobSidecar{
block_root: beacon_block_root,
index,
slot: beacon_block_slot,
block_parent_root: block.parent_root,
proposer_index: block.proposer_index,
blob,
kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic.
kzg_proof: kzg_aggregated_proof // TODO: yeah
};
let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self
.chain
.get_blobs_checking_early_attester_cache(&root)
.await)
}
Entry::Occupied(entry) => {
entry.into_mut()
}
};
match blob_list_result.as_ref() {
Ok(Some(blobs_sidecar_list)) => {
for blob_sidecar in blobs_sidecar_list.iter() {
if blob_sidecar.index == index {
self.send_response(
peer_id,
Response::BlobsByRoot(Some(Arc::new(blob_sidecar))),
Response::BlobsByRoot(Some(blob_sidecar.clone())),
request_id,
);
send_block_count += 1;
send_blob_count += 1;
break;
}
}
}
@ -355,7 +348,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received BlobsByRoot Request";
"peer" => %peer_id,
"requested" => requested_blobs,
"returned" => send_block_count
"returned" => send_blob_count
);
// send stream termination
@ -837,31 +830,12 @@ impl<T: BeaconChainTypes> Worker<T> {
for root in block_roots {
match self.chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blobs)) => {
// TODO: more GROSS code ahead. Reader beware
let types::BlobsSidecar {
beacon_block_root,
beacon_block_slot,
blobs: blob_bundle,
kzg_aggregated_proof: _,
}: types::BlobsSidecar<_> = blobs;
for (blob_index, blob) in blob_bundle.into_iter().enumerate() {
let blob_sidecar = types::BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot: beacon_block_slot,
block_parent_root: Hash256::zero(),
proposer_index: 0,
blob,
kzg_commitment: types::KzgCommitment::default(),
kzg_proof: types::KzgProof::default(),
};
Ok(Some(blob_sidecar_list)) => {
for blob_sidecar in blob_sidecar_list.iter() {
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))),
response: Response::BlobsByRange(Some(blob_sidecar.clone())),
id: request_id,
});
}

View File

@ -66,7 +66,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The hot database also contains all blocks.
pub hot_db: Hot,
/// LRU cache of deserialized blobs. Updated whenever a blob is loaded.
blob_cache: Mutex<LruCache<Hash256, BlobsSidecar<E>>>,
blob_cache: Mutex<LruCache<Hash256, BlobSidecarList<E>>>,
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
/// Chain spec.
@ -568,7 +568,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
}
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar<E>) -> Result<(), Error> {
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList<E>) -> Result<(), Error> {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
blobs_db.put_bytes(
DBColumn::BeaconBlob.into(),
@ -582,7 +582,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn blobs_as_kv_store_ops(
&self,
key: &Hash256,
blobs: &BlobsSidecar<E>,
blobs: BlobSidecarList<E>,
ops: &mut Vec<KeyValueStoreOp>,
) {
let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes());
@ -817,7 +817,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
StoreOp::PutBlobs(block_root, blobs) => {
self.blobs_as_kv_store_ops(&block_root, &blobs, &mut key_value_batch);
self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch);
}
StoreOp::PutStateSummary(state_root, summary) => {
@ -885,8 +885,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::PutBlobs(_, _) => true,
StoreOp::DeleteBlobs(block_root) => {
match self.get_blobs(block_root) {
Ok(Some(blobs_sidecar)) => {
blobs_to_delete.push(blobs_sidecar);
Ok(Some(blobs_sidecar_list)) => {
blobs_to_delete.push((*block_root, blobs_sidecar_list));
}
Err(e) => {
error!(
@ -926,7 +926,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let reverse_op = match op {
StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root),
StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() {
Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)),
Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs),
None => return Err(HotColdDBError::Rollback.into()),
},
_ => return Err(HotColdDBError::Rollback.into()),
@ -972,7 +972,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in blob_cache_ops {
match op {
StoreOp::PutBlobs(block_root, blobs) => {
guard_blob.put(block_root, (*blobs).clone());
guard_blob.put(block_root, blobs);
}
StoreOp::DeleteBlobs(block_root) => {
@ -1320,12 +1320,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// Fetch a blobs sidecar from the store.
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobSidecarList<E>>, Error> {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
Some(ref blobs_bytes) => {
let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?;
let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?;
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
// may want to attempt to use one again
self.blob_cache.lock().put(*block_root, blobs.clone());

View File

@ -159,7 +159,8 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutBlobs(Hash256, Arc<BlobsSidecar<E>>),
// TODO (mark): space can be optimized here by de-duplicating data
PutBlobs(Hash256, BlobSidecarList<E>),
PutOrphanedBlobsKey(Hash256),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),

View File

@ -1,5 +1,6 @@
use super::*;
use ethereum_types::{H160, H256, U128, U256};
use std::sync::Arc;
fn int_to_hash256(int: u64) -> Hash256 {
let mut bytes = [0; HASHSIZE];
@ -186,6 +187,24 @@ impl TreeHash for H256 {
}
}
impl<T: TreeHash> TreeHash for Arc<T> {
fn tree_hash_type() -> TreeHashType {
T::tree_hash_type()
}
fn tree_hash_packed_encoding(&self) -> PackedEncoding {
self.as_ref().tree_hash_packed_encoding()
}
fn tree_hash_packing_factor() -> usize {
T::tree_hash_packing_factor()
}
fn tree_hash_root(&self) -> Hash256 {
self.as_ref().tree_hash_root()
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -6,6 +6,7 @@ use serde_derive::{Deserialize, Serialize};
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use std::sync::Arc;
use test_random_derive::TestRandom;
use tree_hash_derive::TreeHash;
@ -47,7 +48,7 @@ pub struct BlobSidecar<T: EthSpec> {
pub kzg_proof: KzgProof,
}
pub type BlobSidecarList<T> = VariableList<BlobSidecar<T>, <T as EthSpec>::MaxBlobsPerBlock>;
pub type BlobSidecarList<T> = VariableList<Arc<BlobSidecar<T>>, <T as EthSpec>::MaxBlobsPerBlock>;
impl<T: EthSpec> SignedRoot for BlobSidecar<T> {}

View File

@ -3,6 +3,7 @@ use derivative::Derivative;
use serde_derive::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use std::sync::Arc;
use test_random_derive::TestRandom;
use tree_hash_derive::TreeHash;
@ -23,7 +24,7 @@ use tree_hash_derive::TreeHash;
#[arbitrary(bound = "T: EthSpec")]
#[derivative(Hash(bound = "T: EthSpec"))]
pub struct SignedBlobSidecar<T: EthSpec> {
pub message: BlobSidecar<T>,
pub message: Arc<BlobSidecar<T>>,
pub signature: Signature,
}