From 33dd13c798f36f5db22211968d1ff4271bc8fd71 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 25 Jul 2023 10:51:10 -0400 Subject: [PATCH] Refactor deneb block processing (#4511) * Revert "fix merge" This reverts commit 405e95b0ce15409f06504f45c8d93071523e9539. * refactor deneb block processing * cargo fmt * fix ci --- beacon_node/beacon_chain/src/beacon_chain.rs | 17 +- .../beacon_chain/src/blob_verification.rs | 240 +-------- .../beacon_chain/src/block_verification.rs | 247 ++------- .../src/block_verification_types.rs | 438 ++++++++++++++++ .../src/data_availability_checker.rs | 467 ++++++------------ .../overflow_lru_cache.rs | 29 +- beacon_node/beacon_chain/src/lib.rs | 9 +- beacon_node/beacon_chain/src/test_utils.rs | 63 ++- .../tests/attestation_production.rs | 16 +- .../beacon_chain/tests/block_verification.rs | 84 ++-- .../tests/payload_invalidation.rs | 6 +- beacon_node/beacon_chain/tests/store_tests.rs | 29 +- beacon_node/http_api/src/publish_blocks.rs | 2 +- .../http_api/tests/interactive_tests.rs | 2 +- beacon_node/http_api/tests/status_tests.rs | 4 +- .../gossip_methods.rs | 2 +- .../src/network_beacon_processor/mod.rs | 6 +- .../network_beacon_processor/sync_methods.rs | 17 +- .../network/src/sync/backfill_sync/mod.rs | 6 +- .../network/src/sync/block_lookups/mod.rs | 14 +- .../src/sync/block_lookups/parent_lookup.rs | 8 +- .../sync/block_lookups/single_block_lookup.rs | 26 +- .../network/src/sync/block_lookups/tests.rs | 2 +- .../src/sync/block_sidecar_coupling.rs | 44 +- beacon_node/network/src/sync/manager.rs | 14 +- .../network/src/sync/network_context.rs | 4 +- .../network/src/sync/range_sync/batch.rs | 16 +- .../network/src/sync/range_sync/chain.rs | 4 +- .../network/src/sync/range_sync/range.rs | 9 +- consensus/fork_choice/tests/tests.rs | 18 +- .../state_processing/src/consensus_context.rs | 12 - consensus/types/src/lib.rs | 7 +- consensus/types/src/signed_beacon_block.rs | 21 + 33 files changed, 931 insertions(+), 952 deletions(-) create mode 100644 beacon_node/beacon_chain/src/block_verification_types.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b28d0b81f..d136188b6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -8,13 +8,16 @@ use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache} use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; -use crate::blob_verification::{self, AsBlock, BlobError, BlockWrapper, GossipVerifiedBlob}; +use crate::blob_verification::{self, BlobError, GossipVerifiedBlob}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::POS_PANDA_BANNER; use crate::block_verification::{ check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root, - signature_verify_chain_segment, AvailableExecutedBlock, BlockError, BlockImportData, - ExecutedBlock, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, + signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock, + IntoExecutionPendingBlock, +}; +use crate::block_verification_types::{ + AsBlock, AvailableExecutedBlock, BlockImportData, ExecutedBlock, RpcBlock, }; pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock}; use crate::chain_config::ChainConfig; @@ -122,7 +125,7 @@ use types::*; pub type ForkChoiceError = fork_choice::Error; /// Alias to appease clippy. -type HashBlockTuple = (Hash256, BlockWrapper); +type HashBlockTuple = (Hash256, RpcBlock); /// The time-out before failure during an operation to take a read/write RwLock on the block /// processing cache. @@ -2521,7 +2524,7 @@ impl BeaconChain { /// This method is potentially long-running and should not run on the core executor. pub fn filter_chain_segment( self: &Arc, - chain_segment: Vec>, + chain_segment: Vec>, ) -> Result>, ChainSegmentResult> { // This function will never import any blocks. let imported_blocks = 0; @@ -2627,7 +2630,7 @@ impl BeaconChain { /// `Self::process_block`. pub async fn process_chain_segment( self: &Arc, - chain_segment: Vec>, + chain_segment: Vec>, notify_execution_layer: NotifyExecutionLayer, ) -> ChainSegmentResult { let mut imported_blocks = 0; @@ -2804,7 +2807,7 @@ impl BeaconChain { /// /// - `SignedBeaconBlock` /// - `GossipVerifiedBlock` - /// - `BlockWrapper` + /// - `RpcBlock` /// /// ## Errors /// diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index da3ae2c93..78e48f0ed 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -7,22 +7,18 @@ use crate::beacon_chain::{ BeaconChain, BeaconChainTypes, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }; -use crate::data_availability_checker::{ - AvailabilityCheckError, AvailabilityPendingBlock, AvailableBlock, -}; +use crate::data_availability_checker::AvailabilityCheckError; use crate::kzg_utils::{validate_blob, validate_blobs}; use crate::BeaconChainError; -use eth2::types::BlockContentsTuple; use kzg::Kzg; use slog::{debug, warn}; use ssz_derive::{Decode, Encode}; -use ssz_types::{FixedVector, VariableList}; +use ssz_types::VariableList; use std::borrow::Cow; -use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; +use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecar, BlobSidecarList, ChainSpec, - CloneConfig, Epoch, EthSpec, FullPayload, Hash256, KzgCommitment, RelativeEpoch, - SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, + BeaconState, BeaconStateError, BlobSidecar, BlobSidecarList, ChainSpec, CloneConfig, EthSpec, + Hash256, KzgCommitment, RelativeEpoch, SignedBlobSidecar, Slot, }; #[derive(Debug)] @@ -519,13 +515,12 @@ pub fn verify_kzg_for_blob( /// Note: This function should be preferred over calling `verify_kzg_for_blob` /// in a loop since this function kzg verifies a list of blobs more efficiently. pub fn verify_kzg_for_blob_list( - blob_list: Vec>>, + blob_list: &BlobSidecarList, kzg: &Kzg, -) -> Result, AvailabilityCheckError> { +) -> Result<(), AvailabilityCheckError> { let _timer = crate::metrics::start_timer(&crate::metrics::KZG_VERIFICATION_BATCH_TIMES); let (blobs, (commitments, proofs)): (Vec<_>, (Vec<_>, Vec<_>)) = blob_list - .clone() - .into_iter() + .iter() .map(|blob| (blob.blob.clone(), (blob.kzg_commitment, blob.kzg_proof))) .unzip(); if validate_blobs::( @@ -536,225 +531,8 @@ pub fn verify_kzg_for_blob_list( ) .map_err(AvailabilityCheckError::Kzg)? { - Ok(blob_list - .into_iter() - .map(|blob| KzgVerifiedBlob { blob }) - .collect()) + Ok(()) } else { Err(AvailabilityCheckError::KzgVerificationFailed) } } - -pub type KzgVerifiedBlobList = Vec>; - -#[derive(Debug, Clone)] -pub enum MaybeAvailableBlock { - /// This variant is fully available. - /// i.e. for pre-deneb blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for - /// post-4844 blocks, it contains a `SignedBeaconBlock` and a Blobs variant other than `Blobs::None`. - Available(AvailableBlock), - /// This variant is not fully available and requires blobs to become fully available. - AvailabilityPending(AvailabilityPendingBlock), -} - -/// Trait for common block operations. -pub trait AsBlock { - fn slot(&self) -> Slot; - fn epoch(&self) -> Epoch; - fn parent_root(&self) -> Hash256; - fn state_root(&self) -> Hash256; - fn signed_block_header(&self) -> SignedBeaconBlockHeader; - fn message(&self) -> BeaconBlockRef; - fn as_block(&self) -> &SignedBeaconBlock; - fn block_cloned(&self) -> Arc>; - fn canonical_root(&self) -> Hash256; - fn into_block_wrapper(self) -> BlockWrapper; -} - -impl AsBlock for MaybeAvailableBlock { - fn slot(&self) -> Slot { - self.as_block().slot() - } - fn epoch(&self) -> Epoch { - self.as_block().epoch() - } - fn parent_root(&self) -> Hash256 { - self.as_block().parent_root() - } - fn state_root(&self) -> Hash256 { - self.as_block().state_root() - } - fn signed_block_header(&self) -> SignedBeaconBlockHeader { - self.as_block().signed_block_header() - } - fn message(&self) -> BeaconBlockRef { - self.as_block().message() - } - fn as_block(&self) -> &SignedBeaconBlock { - match &self { - MaybeAvailableBlock::Available(block) => block.as_block(), - MaybeAvailableBlock::AvailabilityPending(block) => block.as_block(), - } - } - fn block_cloned(&self) -> Arc> { - match &self { - MaybeAvailableBlock::Available(block) => block.block_cloned(), - MaybeAvailableBlock::AvailabilityPending(block) => block.block_cloned(), - } - } - fn canonical_root(&self) -> Hash256 { - self.as_block().canonical_root() - } - - fn into_block_wrapper(self) -> BlockWrapper { - match self { - MaybeAvailableBlock::Available(available_block) => available_block.into_block_wrapper(), - MaybeAvailableBlock::AvailabilityPending(pending_block) => { - BlockWrapper::Block(pending_block.to_block()) - } - } - } -} - -impl AsBlock for &MaybeAvailableBlock { - fn slot(&self) -> Slot { - self.as_block().slot() - } - fn epoch(&self) -> Epoch { - self.as_block().epoch() - } - fn parent_root(&self) -> Hash256 { - self.as_block().parent_root() - } - fn state_root(&self) -> Hash256 { - self.as_block().state_root() - } - fn signed_block_header(&self) -> SignedBeaconBlockHeader { - self.as_block().signed_block_header() - } - fn message(&self) -> BeaconBlockRef { - self.as_block().message() - } - fn as_block(&self) -> &SignedBeaconBlock { - match &self { - MaybeAvailableBlock::Available(block) => block.as_block(), - MaybeAvailableBlock::AvailabilityPending(block) => block.as_block(), - } - } - fn block_cloned(&self) -> Arc> { - match &self { - MaybeAvailableBlock::Available(block) => block.block_cloned(), - MaybeAvailableBlock::AvailabilityPending(block) => block.block_cloned(), - } - } - fn canonical_root(&self) -> Hash256 { - self.as_block().canonical_root() - } - - fn into_block_wrapper(self) -> BlockWrapper { - self.clone().into_block_wrapper() - } -} - -#[derive(Debug, Clone, Derivative)] -#[derivative(Hash(bound = "E: EthSpec"))] -pub enum BlockWrapper { - Block(Arc>), - BlockAndBlobs(Arc>, FixedBlobSidecarList), -} - -impl BlockWrapper { - pub fn new(block: Arc>, blobs: Option>) -> Self { - match blobs { - Some(blobs) => { - let blobs = FixedVector::from(blobs.into_iter().map(Some).collect::>()); - BlockWrapper::BlockAndBlobs(block, blobs) - } - None => BlockWrapper::Block(block), - } - } - pub fn deconstruct(self) -> (Arc>, Option>) { - match self { - BlockWrapper::Block(block) => (block, None), - BlockWrapper::BlockAndBlobs(block, blobs) => (block, Some(blobs)), - } - } -} - -impl AsBlock for BlockWrapper { - fn slot(&self) -> Slot { - self.as_block().slot() - } - fn epoch(&self) -> Epoch { - self.as_block().epoch() - } - fn parent_root(&self) -> Hash256 { - self.as_block().parent_root() - } - fn state_root(&self) -> Hash256 { - self.as_block().state_root() - } - fn signed_block_header(&self) -> SignedBeaconBlockHeader { - self.as_block().signed_block_header() - } - fn message(&self) -> BeaconBlockRef { - self.as_block().message() - } - fn as_block(&self) -> &SignedBeaconBlock { - match &self { - BlockWrapper::Block(block) => block, - BlockWrapper::BlockAndBlobs(block, _) => block, - } - } - fn block_cloned(&self) -> Arc> { - match &self { - BlockWrapper::Block(block) => block.clone(), - BlockWrapper::BlockAndBlobs(block, _) => block.clone(), - } - } - fn canonical_root(&self) -> Hash256 { - self.as_block().canonical_root() - } - - fn into_block_wrapper(self) -> BlockWrapper { - self - } -} - -impl BlockWrapper { - pub fn n_blobs(&self) -> usize { - match self { - BlockWrapper::Block(_) => 0, - BlockWrapper::BlockAndBlobs(_, blobs) => blobs.len(), - } - } -} - -impl From>> for BlockWrapper { - fn from(value: Arc>) -> Self { - Self::Block(value) - } -} - -impl From> for BlockWrapper { - fn from(value: SignedBeaconBlock) -> Self { - Self::Block(Arc::new(value)) - } -} - -impl From>> for BlockWrapper { - fn from(value: BlockContentsTuple>) -> Self { - match value.1 { - Some(variable_list) => { - let mut blobs = Vec::with_capacity(E::max_blobs_per_block()); - for blob in variable_list { - if blob.message.index < E::max_blobs_per_block() as u64 { - blobs.insert(blob.message.index as usize, Some(blob.message)); - } - } - Self::BlockAndBlobs(Arc::new(value.0), FixedVector::from(blobs)) - } - None => Self::Block(Arc::new(value.0)), - } - } -} diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 796712ed2..c9e0ee4c9 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -48,13 +48,11 @@ // returned alongside. #![allow(clippy::result_large_err)] -use crate::blob_verification::{ - AsBlock, BlobError, BlockWrapper, GossipVerifiedBlob, GossipVerifiedBlobList, - MaybeAvailableBlock, -}; -use crate::data_availability_checker::{ - AvailabilityCheckError, AvailabilityPendingBlock, AvailableBlock, +use crate::blob_verification::{BlobError, GossipVerifiedBlob}; +use crate::block_verification_types::{ + AsBlock, BlockImportData, GossipVerifiedBlockContents, RpcBlock, }; +use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, @@ -99,13 +97,11 @@ use std::time::Duration; use store::{Error as DBError, HotStateSummary, KeyValueStore, SignedBlobSidecarList, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; -use types::blob_sidecar::BlobIdentifier; use types::ExecPayload; -use types::{ssz_tagged_beacon_state, ssz_tagged_signed_beacon_block}; use types::{ - BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, - EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, - RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec, + ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, + SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; pub const POS_PANDA_BANNER: &str = r#" @@ -153,7 +149,7 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it cannot be processed without already knowing /// its parent. - ParentUnknown(BlockWrapper), + ParentUnknown(RpcBlock), /// The block slot is greater than the present slot. /// /// ## Peer scoring @@ -558,7 +554,7 @@ fn process_block_slash_info( /// The given `chain_segment` must contain only blocks from the same epoch, otherwise an error /// will be returned. pub fn signature_verify_chain_segment( - mut chain_segment: Vec<(Hash256, BlockWrapper)>, + mut chain_segment: Vec<(Hash256, RpcBlock)>, chain: &BeaconChain, ) -> Result>, BlockError> { if chain_segment.is_empty() { @@ -595,7 +591,7 @@ pub fn signature_verify_chain_segment( let maybe_available_block = chain .data_availability_checker - .check_availability(block.clone())?; + .check_rpc_block_availability(block.clone())?; // Save the block and its consensus context. The context will have had its proposer index // and attesting indices filled in, which can be used to accelerate later block processing. @@ -625,19 +621,12 @@ pub fn signature_verify_chain_segment( #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct GossipVerifiedBlock { - pub block: MaybeAvailableBlock, + pub block: Arc>, pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, } -impl GossipVerifiedBlock { - /// Useful for publishing after gossip verification. - pub fn into_block_wrapper(self) -> BlockWrapper { - self.block.into_block_wrapper() - } -} - /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit /// signatures) have been verified. pub struct SignatureVerifiedBlock { @@ -669,147 +658,6 @@ pub struct ExecutionPendingBlock { pub payload_verification_handle: PayloadVerificationHandle, } -pub enum ExecutedBlock { - Available(AvailableExecutedBlock), - AvailabilityPending(AvailabilityPendingExecutedBlock), -} - -impl ExecutedBlock { - pub fn as_block(&self) -> &SignedBeaconBlock { - match self { - Self::Available(available) => available.block.block(), - Self::AvailabilityPending(pending) => pending.block.as_block(), - } - } -} - -impl std::fmt::Debug for ExecutedBlock { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.as_block()) - } -} - -impl ExecutedBlock { - pub fn new( - block: MaybeAvailableBlock, - import_data: BlockImportData, - payload_verification_outcome: PayloadVerificationOutcome, - ) -> Self { - match block { - MaybeAvailableBlock::Available(available_block) => { - Self::Available(AvailableExecutedBlock::new( - available_block, - import_data, - payload_verification_outcome, - )) - } - MaybeAvailableBlock::AvailabilityPending(pending_block) => { - Self::AvailabilityPending(AvailabilityPendingExecutedBlock::new( - pending_block, - import_data, - payload_verification_outcome, - )) - } - } - } -} - -#[derive(Debug, PartialEq)] -pub struct AvailableExecutedBlock { - pub block: AvailableBlock, - pub import_data: BlockImportData, - pub payload_verification_outcome: PayloadVerificationOutcome, -} - -impl AvailableExecutedBlock { - pub fn new( - block: AvailableBlock, - import_data: BlockImportData, - payload_verification_outcome: PayloadVerificationOutcome, - ) -> Self { - Self { - block, - import_data, - payload_verification_outcome, - } - } - - pub fn get_all_blob_ids(&self) -> Vec { - let num_blobs_expected = self - .block - .message() - .body() - .blob_kzg_commitments() - .map_or(0, |commitments| commitments.len()); - let mut blob_ids = Vec::with_capacity(num_blobs_expected); - for i in 0..num_blobs_expected { - blob_ids.push(BlobIdentifier { - block_root: self.import_data.block_root, - index: i as u64, - }); - } - blob_ids - } -} - -#[derive(Encode, Decode, Clone)] -pub struct AvailabilityPendingExecutedBlock { - pub block: AvailabilityPendingBlock, - pub import_data: BlockImportData, - pub payload_verification_outcome: PayloadVerificationOutcome, -} - -impl AvailabilityPendingExecutedBlock { - pub fn new( - block: AvailabilityPendingBlock, - import_data: BlockImportData, - payload_verification_outcome: PayloadVerificationOutcome, - ) -> Self { - Self { - block, - import_data, - payload_verification_outcome, - } - } - - pub fn num_blobs_expected(&self) -> usize { - self.block - .kzg_commitments() - .map_or(0, |commitments| commitments.len()) - } - - pub fn get_all_blob_ids(&self) -> Vec { - let block_root = self.import_data.block_root; - self.block - .get_filtered_blob_ids(Some(block_root), |_, _| true) - } - - pub fn get_filtered_blob_ids( - &self, - filter: impl Fn(usize, Hash256) -> bool, - ) -> Vec { - self.block - .get_filtered_blob_ids(Some(self.import_data.block_root), filter) - } -} - -#[derive(Debug, PartialEq, Encode, Decode, Clone)] -// TODO (mark): investigate using an Arc / Arc -// here to make this cheaper to clone -pub struct BlockImportData { - pub block_root: Hash256, - #[ssz(with = "ssz_tagged_beacon_state")] - pub state: BeaconState, - #[ssz(with = "ssz_tagged_signed_beacon_block")] - pub parent_block: SignedBeaconBlock>, - pub parent_eth1_finalization_data: Eth1FinalizationData, - pub confirmed_state_roots: Vec, - pub consensus_context: ConsensusContext, -} - -pub type GossipVerifiedBlockContents = - (GossipVerifiedBlock, Option>); - pub trait IntoGossipVerifiedBlockContents: Sized { fn into_gossip_verified_block( self, @@ -911,27 +759,23 @@ impl GossipVerifiedBlock { block: Arc>, chain: &BeaconChain, ) -> Result> { - let maybe_available = chain - .data_availability_checker - .check_availability(block.into())?; // If the block is valid for gossip we don't supply it to the slasher here because // we assume it will be transformed into a fully verified block. We *do* need to supply // it to the slasher if an error occurs, because that's the end of this block's journey, // and it could be a repeat proposal (a likely cause for slashing!). - let header = maybe_available.signed_block_header(); - Self::new_without_slasher_checks(maybe_available, chain).map_err(|e| { + let header = block.signed_block_header(); + Self::new_without_slasher_checks(block, chain).map_err(|e| { process_block_slash_info(chain, BlockSlashInfo::from_early_error(header, e)) }) } /// As for new, but doesn't pass the block to the slasher. fn new_without_slasher_checks( - block: MaybeAvailableBlock, + block: Arc>, chain: &BeaconChain, ) -> Result> { // Ensure the block is the correct structure for the fork at `block.slot()`. block - .as_block() .fork_name(&chain.spec) .map_err(BlockError::InconsistentFork)?; @@ -947,7 +791,7 @@ impl GossipVerifiedBlock { }); } - let block_root = get_block_root(block.as_block()); + let block_root = get_block_root(&block); // Disallow blocks that conflict with the anchor (weak subjectivity checkpoint), if any. check_block_against_anchor_slot(block.message(), chain)?; @@ -1067,7 +911,7 @@ impl GossipVerifiedBlock { let pubkey = pubkey_cache .get(block.message().proposer_index() as usize) .ok_or_else(|| BlockError::UnknownValidator(block.message().proposer_index()))?; - block.as_block().verify_signature( + block.verify_signature( Some(block_root), pubkey, &fork, @@ -1111,8 +955,7 @@ impl GossipVerifiedBlock { // Having checked the proposer index and the block root we can cache them. let consensus_context = ConsensusContext::new(block.slot()) .set_current_block_root(block_root) - .set_proposer_index(block.as_block().message().proposer_index()) - .set_kzg_commitments_consistent(true); + .set_proposer_index(block.as_block().message().proposer_index()); Ok(Self { block, @@ -1155,11 +998,10 @@ impl SignatureVerifiedBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn new( - block: BlockWrapper, + block: MaybeAvailableBlock, block_root: Hash256, chain: &BeaconChain, ) -> Result> { - let block = chain.data_availability_checker.check_availability(block)?; // Ensure the block is the correct structure for the fork at `block.slot()`. block .as_block() @@ -1182,10 +1024,8 @@ impl SignatureVerifiedBlock { let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - let mut consensus_context = ConsensusContext::new(block.slot()) - .set_current_block_root(block_root) - // An `AvailabileBlock is passed in here, so we know this check has been run.` - .set_kzg_commitments_consistent(true); + let mut consensus_context = + ConsensusContext::new(block.slot()).set_current_block_root(block_root); signature_verifier.include_all_signatures(block.as_block(), &mut consensus_context)?; @@ -1203,7 +1043,7 @@ impl SignatureVerifiedBlock { /// As for `new` above but producing `BlockSlashInfo`. pub fn check_slashable( - block: BlockWrapper, + block: MaybeAvailableBlock, block_root: Hash256, chain: &BeaconChain, ) -> Result>> { @@ -1238,11 +1078,11 @@ impl SignatureVerifiedBlock { // signature. let mut consensus_context = from.consensus_context; signature_verifier - .include_all_signatures_except_proposal(block.as_block(), &mut consensus_context)?; + .include_all_signatures_except_proposal(block.as_ref(), &mut consensus_context)?; if signature_verifier.verify().is_ok() { Ok(Self { - block, + block: MaybeAvailableBlock::AvailabilityPending(block), block_root: from.block_root, parent: Some(parent), consensus_context, @@ -1299,6 +1139,7 @@ impl IntoExecutionPendingBlock for SignatureVerifiedBloc } } +//TODO(sean) can this be deleted impl IntoExecutionPendingBlock for Arc> { /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` /// and then using that implementation of `IntoExecutionPendingBlock` to complete verification. @@ -1311,8 +1152,16 @@ impl IntoExecutionPendingBlock for Arc IntoExecutionPendingBlock for Arc IntoExecutionPendingBlock for BlockWrapper { +impl IntoExecutionPendingBlock for RpcBlock { /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` /// and then using that implementation of `IntoExecutionPendingBlock` to complete verification. fn into_execution_pending_block_slashable( @@ -1333,8 +1182,16 @@ impl IntoExecutionPendingBlock for BlockWrapper ExecutionPendingBlock { // because it will revert finalization. Note that the finalized block is stored in fork // choice, so we will not reject any child of the finalized block (this is relevant during // genesis). - return Err(BlockError::ParentUnknown(block.into_block_wrapper())); + return Err(BlockError::ParentUnknown(block.into_rpc_block())); } /* @@ -1826,7 +1683,7 @@ pub fn check_block_is_finalized_checkpoint_or_descendant< block_parent_root: block.parent_root(), }) } else { - Err(BlockError::ParentUnknown(block.into_block_wrapper())) + Err(BlockError::ParentUnknown(block.into_rpc_block())) } } } @@ -1898,8 +1755,8 @@ pub fn get_block_root(block: &SignedBeaconBlock) -> Hash256 { #[allow(clippy::type_complexity)] fn verify_parent_block_is_known( chain: &BeaconChain, - block: MaybeAvailableBlock, -) -> Result<(ProtoBlock, MaybeAvailableBlock), BlockError> { + block: Arc>, +) -> Result<(ProtoBlock, Arc>), BlockError> { if let Some(proto_block) = chain .canonical_head .fork_choice_read_lock() @@ -1907,7 +1764,9 @@ fn verify_parent_block_is_known( { Ok((proto_block, block)) } else { - Err(BlockError::ParentUnknown(block.into_block_wrapper())) + Err(BlockError::ParentUnknown(RpcBlock::new_without_blobs( + block, + ))) } } @@ -1938,7 +1797,7 @@ fn load_parent>( .fork_choice_read_lock() .contains_block(&block.parent_root()) { - return Err(BlockError::ParentUnknown(block.into_block_wrapper())); + return Err(BlockError::ParentUnknown(block.into_rpc_block())); } let block_delay = chain diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs new file mode 100644 index 000000000..e80372a6d --- /dev/null +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -0,0 +1,438 @@ +use crate::blob_verification::GossipVerifiedBlobList; +use crate::data_availability_checker::AvailabilityCheckError; +pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock}; +use crate::eth1_finalization_cache::Eth1FinalizationData; +use crate::{data_availability_checker, GossipVerifiedBlock, PayloadVerificationOutcome}; +use derivative::Derivative; +use ssz_derive::{Decode, Encode}; +use state_processing::ConsensusContext; +use std::sync::Arc; +use types::{ + blob_sidecar::BlobIdentifier, ssz_tagged_beacon_state, ssz_tagged_signed_beacon_block, + ssz_tagged_signed_beacon_block_arc, +}; +use types::{ + BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256, + SignedBeaconBlock, SignedBeaconBlockHeader, Slot, +}; + +/// A block that has been received over RPC. It has 2 internal variants: +/// +/// 1. `BlockAndBlobs`: A fully available post deneb block with all the blobs available. This variant +/// is only constructed after making consistency checks between blocks and blobs. +/// Hence, it is fully self contained w.r.t verification. i.e. this block has all the required +/// data to get verfied and imported into fork choice. +/// +/// 2. `Block`: This can be a fully available pre-deneb block **or** a post-deneb block that may or may +/// not require blobs to be considered fully available. +/// +/// Note: We make a distinction over blocks received over gossip because +/// in a post-deneb world, the blobs corresponding to a given block that are received +/// over rpc do not contain the proposer signature for dos resistance. +#[derive(Debug, Clone, Derivative)] +#[derivative(Hash(bound = "E: EthSpec"))] +pub struct RpcBlock { + block: RpcBlockInner, +} + +/// Note: This variant is intentionally private because we want to safely construct the +/// internal variants after applying consistency checks to ensure that the block and blobs +/// are consistent with respect to each other. +#[derive(Debug, Clone, Derivative)] +#[derivative(Hash(bound = "E: EthSpec"))] +enum RpcBlockInner { + /// Single block lookup response. This should potentially hit the data availability cache. + Block(Arc>), + /// This variant is used with parent lookups and by-range responses. It should have all blobs + /// ordered, all block roots matching, and the correct number of blobs for this block. + BlockAndBlobs(Arc>, BlobSidecarList), +} + +impl RpcBlock { + /// Constructs a `Block` variant. + pub fn new_without_blobs(block: Arc>) -> Self { + Self { + block: RpcBlockInner::Block(block), + } + } + + /// Constructs a new `BlockAndBlobs` variant after making consistency + /// checks between the provided blocks and blobs. + pub fn new( + block: Arc>, + blobs: Option>, + ) -> Result { + if let Some(blobs) = blobs.as_ref() { + data_availability_checker::consistency_checks(&block, blobs)?; + } + let inner = match blobs { + Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs), + None => RpcBlockInner::Block(block), + }; + Ok(Self { block: inner }) + } + + pub fn deconstruct(self) -> (Arc>, Option>) { + match self.block { + RpcBlockInner::Block(block) => (block, None), + RpcBlockInner::BlockAndBlobs(block, blobs) => (block, Some(blobs)), + } + } + pub fn n_blobs(&self) -> usize { + match &self.block { + RpcBlockInner::Block(_) => 0, + RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(), + } + } +} + +impl From>> for RpcBlock { + fn from(value: Arc>) -> Self { + Self::new_without_blobs(value) + } +} + +impl From> for RpcBlock { + fn from(value: SignedBeaconBlock) -> Self { + Self::new_without_blobs(Arc::new(value)) + } +} + +/// A block that has gone through all pre-deneb block processing checks including block processing +/// and execution by an EL client. This block hasn't completed data availability checks. +/// +/// +/// It contains 2 variants: +/// 1. `Available`: This block has been executed and also contains all data to consider it a +/// fully available block. i.e. for post-deneb, this implies that this contains all the +/// required blobs. +/// 2. `AvailabilityPending`: This block hasn't received all required blobs to consider it a +/// fully available block. +pub enum ExecutedBlock { + Available(AvailableExecutedBlock), + AvailabilityPending(AvailabilityPendingExecutedBlock), +} + +impl ExecutedBlock { + pub fn new( + block: MaybeAvailableBlock, + import_data: BlockImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + match block { + MaybeAvailableBlock::Available(available_block) => { + Self::Available(AvailableExecutedBlock::new( + available_block, + import_data, + payload_verification_outcome, + )) + } + MaybeAvailableBlock::AvailabilityPending(pending_block) => { + Self::AvailabilityPending(AvailabilityPendingExecutedBlock::new( + pending_block, + import_data, + payload_verification_outcome, + )) + } + } + } + + pub fn as_block(&self) -> &SignedBeaconBlock { + match self { + Self::Available(available) => available.block.block(), + Self::AvailabilityPending(pending) => &pending.block, + } + } +} + +/// A block that has completed all pre-deneb block processing checks including verification +/// by an EL client **and** has all requisite blob data to be imported into fork choice. +#[derive(PartialEq)] +pub struct AvailableExecutedBlock { + pub block: AvailableBlock, + pub import_data: BlockImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailableExecutedBlock { + pub fn new( + block: AvailableBlock, + import_data: BlockImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + block, + import_data, + payload_verification_outcome, + } + } + + pub fn get_all_blob_ids(&self) -> Vec { + let num_blobs_expected = self + .block + .message() + .body() + .blob_kzg_commitments() + .map_or(0, |commitments| commitments.len()); + let mut blob_ids = Vec::with_capacity(num_blobs_expected); + for i in 0..num_blobs_expected { + blob_ids.push(BlobIdentifier { + block_root: self.import_data.block_root, + index: i as u64, + }); + } + blob_ids + } +} + +/// A block that has completed all pre-deneb block processing checks, verification +/// by an EL client but does not have all requisite blob data to get imported into +/// fork choice. +#[derive(Encode, Decode, Clone)] +pub struct AvailabilityPendingExecutedBlock { + #[ssz(with = "ssz_tagged_signed_beacon_block_arc")] + pub block: Arc>, + pub import_data: BlockImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailabilityPendingExecutedBlock { + pub fn new( + block: Arc>, + import_data: BlockImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + block, + import_data, + payload_verification_outcome, + } + } + + pub fn num_blobs_expected(&self) -> usize { + self.block + .message() + .body() + .blob_kzg_commitments() + .map_or(0, |commitments| commitments.len()) + } + + pub fn get_all_blob_ids(&self) -> Vec { + let block_root = self.import_data.block_root; + self.block + .get_filtered_blob_ids(Some(block_root), |_, _| true) + } + + pub fn get_filtered_blob_ids( + &self, + filter: impl Fn(usize, Hash256) -> bool, + ) -> Vec { + self.block + .get_filtered_blob_ids(Some(self.import_data.block_root), filter) + } +} + +#[derive(Debug, PartialEq, Encode, Decode, Clone)] +// TODO (mark): investigate using an Arc / Arc +// here to make this cheaper to clone +pub struct BlockImportData { + pub block_root: Hash256, + #[ssz(with = "ssz_tagged_beacon_state")] + pub state: BeaconState, + #[ssz(with = "ssz_tagged_signed_beacon_block")] + pub parent_block: SignedBeaconBlock>, + pub parent_eth1_finalization_data: Eth1FinalizationData, + pub confirmed_state_roots: Vec, + pub consensus_context: ConsensusContext, +} + +pub type GossipVerifiedBlockContents = + (GossipVerifiedBlock, Option>); + +/// Trait for common block operations. +pub trait AsBlock { + fn slot(&self) -> Slot; + fn epoch(&self) -> Epoch; + fn parent_root(&self) -> Hash256; + fn state_root(&self) -> Hash256; + fn signed_block_header(&self) -> SignedBeaconBlockHeader; + fn message(&self) -> BeaconBlockRef; + fn as_block(&self) -> &SignedBeaconBlock; + fn block_cloned(&self) -> Arc>; + fn canonical_root(&self) -> Hash256; + fn into_rpc_block(self) -> RpcBlock; +} + +impl AsBlock for Arc> { + fn slot(&self) -> Slot { + SignedBeaconBlock::slot(self) + } + + fn epoch(&self) -> Epoch { + SignedBeaconBlock::epoch(self) + } + + fn parent_root(&self) -> Hash256 { + SignedBeaconBlock::parent_root(self) + } + + fn state_root(&self) -> Hash256 { + SignedBeaconBlock::state_root(self) + } + + fn signed_block_header(&self) -> SignedBeaconBlockHeader { + SignedBeaconBlock::signed_block_header(self) + } + + fn message(&self) -> BeaconBlockRef { + SignedBeaconBlock::message(self) + } + + fn as_block(&self) -> &SignedBeaconBlock { + self + } + + fn block_cloned(&self) -> Arc> { + Arc::>::clone(self) + } + + fn canonical_root(&self) -> Hash256 { + SignedBeaconBlock::canonical_root(self) + } + + fn into_rpc_block(self) -> RpcBlock { + RpcBlock::new_without_blobs(self) + } +} + +impl AsBlock for MaybeAvailableBlock { + fn slot(&self) -> Slot { + self.as_block().slot() + } + fn epoch(&self) -> Epoch { + self.as_block().epoch() + } + fn parent_root(&self) -> Hash256 { + self.as_block().parent_root() + } + fn state_root(&self) -> Hash256 { + self.as_block().state_root() + } + fn signed_block_header(&self) -> SignedBeaconBlockHeader { + self.as_block().signed_block_header() + } + fn message(&self) -> BeaconBlockRef { + self.as_block().message() + } + fn as_block(&self) -> &SignedBeaconBlock { + match &self { + MaybeAvailableBlock::Available(block) => block.as_block(), + MaybeAvailableBlock::AvailabilityPending(block) => block, + } + } + fn block_cloned(&self) -> Arc> { + match &self { + MaybeAvailableBlock::Available(block) => block.block_cloned(), + MaybeAvailableBlock::AvailabilityPending(block) => block.clone(), + } + } + fn canonical_root(&self) -> Hash256 { + self.as_block().canonical_root() + } + + fn into_rpc_block(self) -> RpcBlock { + match self { + MaybeAvailableBlock::Available(available_block) => available_block.into_rpc_block(), + MaybeAvailableBlock::AvailabilityPending(block) => RpcBlock::new_without_blobs(block), + } + } +} + +impl AsBlock for AvailableBlock { + fn slot(&self) -> Slot { + self.block().slot() + } + + fn epoch(&self) -> Epoch { + self.block().epoch() + } + + fn parent_root(&self) -> Hash256 { + self.block().parent_root() + } + + fn state_root(&self) -> Hash256 { + self.block().state_root() + } + + fn signed_block_header(&self) -> SignedBeaconBlockHeader { + self.block().signed_block_header() + } + + fn message(&self) -> BeaconBlockRef { + self.block().message() + } + + fn as_block(&self) -> &SignedBeaconBlock { + self.block() + } + + fn block_cloned(&self) -> Arc> { + AvailableBlock::block_cloned(self) + } + + fn canonical_root(&self) -> Hash256 { + self.block().canonical_root() + } + + fn into_rpc_block(self) -> RpcBlock { + let (block, blobs_opt) = self.deconstruct(); + // Circumvent the constructor here, because an Available block will have already had + // consistency checks performed. + let inner = match blobs_opt { + None => RpcBlockInner::Block(block), + Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs), + }; + RpcBlock { block: inner } + } +} + +impl AsBlock for RpcBlock { + fn slot(&self) -> Slot { + self.as_block().slot() + } + fn epoch(&self) -> Epoch { + self.as_block().epoch() + } + fn parent_root(&self) -> Hash256 { + self.as_block().parent_root() + } + fn state_root(&self) -> Hash256 { + self.as_block().state_root() + } + fn signed_block_header(&self) -> SignedBeaconBlockHeader { + self.as_block().signed_block_header() + } + fn message(&self) -> BeaconBlockRef { + self.as_block().message() + } + fn as_block(&self) -> &SignedBeaconBlock { + match &self.block { + RpcBlockInner::Block(block) => block, + RpcBlockInner::BlockAndBlobs(block, _) => block, + } + } + fn block_cloned(&self) -> Arc> { + match &self.block { + RpcBlockInner::Block(block) => block.clone(), + RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), + } + } + fn canonical_root(&self) -> Hash256 { + self.as_block().canonical_root() + } + + fn into_rpc_block(self) -> RpcBlock { + self + } +} diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index fc48ba47e..9a53a7139 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,28 +1,25 @@ use crate::blob_verification::{ - verify_kzg_for_blob, verify_kzg_for_blob_list, AsBlock, BlockWrapper, GossipVerifiedBlob, - KzgVerifiedBlob, KzgVerifiedBlobList, MaybeAvailableBlock, + verify_kzg_for_blob, verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlob, +}; +use crate::block_verification_types::{ + AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; -use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; - use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache; use crate::{BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Error as KzgError; use kzg::Kzg; use slog::{debug, error}; use slot_clock::SlotClock; -use ssz_types::{Error, FixedVector, VariableList}; +use ssz_types::{Error, VariableList}; use std::collections::HashSet; +use std::fmt; +use std::fmt::Debug; use std::sync::Arc; use strum::IntoStaticStr; use task_executor::TaskExecutor; -use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS; -use types::ssz_tagged_signed_beacon_block; -use types::{ - BeaconBlockRef, BlobSidecarList, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, - SignedBeaconBlock, SignedBeaconBlockHeader, Slot, -}; +use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; mod overflow_lru_cache; @@ -50,12 +47,20 @@ pub enum AvailabilityCheckError { }, IncorrectFork, BlobIndexInvalid(u64), + UnorderedBlobs { + blob_index: u64, + expected_index: u64, + }, StoreError(store::Error), DecodeError(ssz::DecodeError), BlockBlobRootMismatch { block_root: Hash256, blob_block_root: Hash256, }, + BlockBlobSlotMismatch { + block_slot: Slot, + blob_slot: Slot, + }, } impl From for AvailabilityCheckError { @@ -92,12 +97,23 @@ pub struct DataAvailabilityChecker { /// /// Indicates if the block is fully `Available` or if we need blobs or blocks /// to "complete" the requirements for an `AvailableBlock`. -#[derive(Debug, PartialEq)] +#[derive(PartialEq)] pub enum Availability { MissingComponents(Hash256), Available(Box>), } +impl Debug for Availability { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::MissingComponents(block_root) => { + write!(f, "MissingComponents({})", block_root) + } + Self::Available(block) => write!(f, "Available({:?})", block.import_data.block_root), + } + } +} + impl Availability { /// Returns all the blob identifiers associated with an `AvailableBlock`. /// Returns `None` if avaiability hasn't been fully satisfied yet. @@ -230,86 +246,51 @@ impl DataAvailabilityChecker { /// Checks if a block is available, returns a `MaybeAvailableBlock` that may include the fully /// available block. - pub fn check_availability( + pub fn check_rpc_block_availability( &self, - block: BlockWrapper, + block: RpcBlock, ) -> Result, AvailabilityCheckError> { - match block { - BlockWrapper::Block(block) => self.check_availability_without_blobs(block), - BlockWrapper::BlockAndBlobs(block, blob_list) => { - let kzg = self - .kzg - .as_ref() - .ok_or(AvailabilityCheckError::KzgNotInitialized)?; - let filtered_blobs = blob_list.iter().flatten().cloned().collect(); - let verified_blobs = verify_kzg_for_blob_list(filtered_blobs, kzg)?; - - Ok(MaybeAvailableBlock::Available( - self.check_availability_with_blobs(block, verified_blobs)?, - )) + let (block, blobs) = block.deconstruct(); + match blobs { + None => { + if self.blobs_required_for_block(&block) { + Ok(MaybeAvailableBlock::AvailabilityPending(block)) + } else { + Ok(MaybeAvailableBlock::Available(AvailableBlock { + block, + blobs: None, + })) + } + } + Some(blob_list) => { + let verified_blobs = if self.blobs_required_for_block(&block) { + let kzg = self + .kzg + .as_ref() + .ok_or(AvailabilityCheckError::KzgNotInitialized)?; + verify_kzg_for_blob_list(&blob_list, kzg)?; + Some(blob_list) + } else { + None + }; + Ok(MaybeAvailableBlock::Available(AvailableBlock { + block, + blobs: verified_blobs, + })) } } } - /// Verifies a block against a set of KZG verified blobs. Returns an AvailableBlock if block's - /// commitments are consistent with the provided verified blob commitments. - pub fn check_availability_with_blobs( - &self, - block: Arc>, - blobs: KzgVerifiedBlobList, - ) -> Result, AvailabilityCheckError> { - match self.check_availability_without_blobs(block)? { - MaybeAvailableBlock::Available(block) => Ok(block), - MaybeAvailableBlock::AvailabilityPending(pending_block) => { - pending_block.make_available(blobs) - } - } - } - - /// Verifies a block as much as possible, returning a MaybeAvailableBlock enum that may include - /// an AvailableBlock if no blobs are required. Otherwise this will return an AvailabilityPendingBlock. - pub fn check_availability_without_blobs( - &self, - block: Arc>, - ) -> Result, AvailabilityCheckError> { - let blob_requirements = self.get_blob_requirements(&block)?; - let blobs = match blob_requirements { - BlobRequirements::EmptyBlobs => VerifiedBlobs::EmptyBlobs, - BlobRequirements::NotRequired => VerifiedBlobs::NotRequired, - BlobRequirements::PreDeneb => VerifiedBlobs::PreDeneb, - BlobRequirements::Required => { - return Ok(MaybeAvailableBlock::AvailabilityPending( - AvailabilityPendingBlock { block }, - )) - } - }; - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block, - blobs, - })) - } - /// Determines the blob requirements for a block. Answers the question: "Does this block require /// blobs?". - fn get_blob_requirements( - &self, - block: &Arc>>, - ) -> Result { - let verified_blobs = - if let Ok(block_kzg_commitments) = block.message().body().blob_kzg_commitments() { - if self.da_check_required(block.epoch()) { - if block_kzg_commitments.is_empty() { - BlobRequirements::EmptyBlobs - } else { - BlobRequirements::Required - } - } else { - BlobRequirements::NotRequired - } - } else { - BlobRequirements::PreDeneb - }; - Ok(verified_blobs) + fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { + let block_within_da_period = self.da_check_required(block.epoch()); + let block_has_kzg_commitments = block + .message() + .body() + .blob_kzg_commitments() + .map_or(false, |commitments| !commitments.is_empty()); + block_within_da_period && block_has_kzg_commitments } /// The epoch at which we require a data availability check in block processing. @@ -340,6 +321,87 @@ impl DataAvailabilityChecker { } } +/// Verifies an `SignedBeaconBlock` against a set of KZG verified blobs. +/// This does not check whether a block *should* have blobs, these checks should have been +/// completed when producing the `AvailabilityPendingBlock`. +pub fn make_available( + block: Arc>, + blobs: Vec>, +) -> Result, AvailabilityCheckError> { + let blobs = VariableList::new(blobs.into_iter().map(|blob| blob.to_blob()).collect())?; + + consistency_checks(&block, &blobs)?; + + Ok(AvailableBlock { + block, + blobs: Some(blobs), + }) +} + +/// Makes the following checks to ensure that the list of blobs correspond block: +/// +/// * Check that a block is post-deneb +/// * Checks that the number of blobs is equal to the length of kzg commitments in the list +/// * Checks that the index, slot, root and kzg_commitment in the block match the blobs in the correct order +/// +/// Returns `Ok(())` if all consistency checks pass and an error otherwise. +pub fn consistency_checks( + block: &SignedBeaconBlock, + blobs: &[Arc>], +) -> Result<(), AvailabilityCheckError> { + let Ok(block_kzg_commitments) = block + .message() + .body() + .blob_kzg_commitments() else { + return Ok(()) + }; + + if blobs.len() != block_kzg_commitments.len() { + return Err(AvailabilityCheckError::NumBlobsMismatch { + num_kzg_commitments: block_kzg_commitments.len(), + num_blobs: blobs.len(), + }); + } + + if block_kzg_commitments.is_empty() { + return Ok(()); + } + + let block_root = blobs + .first() + .map(|blob| blob.block_root) + .unwrap_or(block.canonical_root()); + for (index, (block_commitment, blob)) in + block_kzg_commitments.iter().zip(blobs.iter()).enumerate() + { + let index = index as u64; + if index != blob.index { + return Err(AvailabilityCheckError::UnorderedBlobs { + blob_index: blob.index, + expected_index: index, + }); + } + if block_root != blob.block_root { + return Err(AvailabilityCheckError::BlockBlobRootMismatch { + block_root, + blob_block_root: blob.block_root, + }); + } + if block.slot() != blob.slot { + return Err(AvailabilityCheckError::BlockBlobSlotMismatch { + block_slot: block.slot(), + blob_slot: blob.slot, + }); + } + if *block_commitment != blob.kzg_commitment { + return Err(AvailabilityCheckError::KzgCommitmentMismatch { + blob_index: blob.index, + }); + } + } + Ok(()) +} + pub fn start_availability_cache_maintenance_service( executor: TaskExecutor, chain: Arc>, @@ -425,244 +487,37 @@ async fn availability_cache_maintenance_service( } } -pub enum BlobRequirements { - Required, - /// This block is from outside the data availability boundary so doesn't require - /// a data availability check. - NotRequired, - /// The block's `kzg_commitments` field is empty so it does not contain any blobs. - EmptyBlobs, - /// This is a block prior to the 4844 fork, so doesn't require any blobs - PreDeneb, -} - -/// A wrapper over a `SignedBeaconBlock` where we have not verified availability of -/// corresponding `BlobSidecar`s and hence, is not ready for import into fork choice. -/// -/// Note: This wrapper does not necessarily correspond to a pre-deneb block as a pre-deneb -/// block that is ready for import will be of type `AvailableBlock` with its `blobs` field -/// set to `VerifiedBlobs::PreDeneb`. -#[derive(Clone, Debug, PartialEq)] -pub struct AvailabilityPendingBlock { - block: Arc>, -} - -impl AvailabilityPendingBlock { - pub fn slot(&self) -> Slot { - self.block.slot() - } - pub fn num_blobs_expected(&self) -> usize { - self.block.num_expected_blobs() - } - - pub fn get_all_blob_ids(&self, block_root: Option) -> Vec { - self.block.get_expected_blob_ids(block_root) - } - - pub fn get_filtered_blob_ids( - &self, - block_root: Option, - filter: impl Fn(usize, Hash256) -> bool, - ) -> Vec { - self.block.get_filtered_blob_ids(block_root, filter) - } -} - -impl AvailabilityPendingBlock { - pub fn to_block(self) -> Arc> { - self.block - } - pub fn as_block(&self) -> &SignedBeaconBlock { - &self.block - } - pub fn block_cloned(&self) -> Arc> { - self.block.clone() - } - pub fn kzg_commitments(&self) -> Result<&KzgCommitments, AvailabilityCheckError> { - self.block - .message() - .body() - .blob_kzg_commitments() - .map_err(|_| AvailabilityCheckError::IncorrectFork) - } - - /// Verifies an AvailabilityPendingBlock against a set of KZG verified blobs. - /// This does not check whether a block *should* have blobs, these checks should have been - /// completed when producing the `AvailabilityPendingBlock`. - pub fn make_available( - self, - blobs: Vec>, - ) -> Result, AvailabilityCheckError> { - let block_kzg_commitments = self.kzg_commitments()?; - if blobs.len() != block_kzg_commitments.len() { - return Err(AvailabilityCheckError::NumBlobsMismatch { - num_kzg_commitments: block_kzg_commitments.len(), - num_blobs: blobs.len(), - }); - } - - for (block_commitment, blob) in block_kzg_commitments.iter().zip(blobs.iter()) { - if *block_commitment != blob.kzg_commitment() { - return Err(AvailabilityCheckError::KzgCommitmentMismatch { - blob_index: blob.as_blob().index, - }); - } - } - - let blobs = VariableList::new(blobs.into_iter().map(|blob| blob.to_blob()).collect())?; - - Ok(AvailableBlock { - block: self.block, - blobs: VerifiedBlobs::Available(blobs), - }) - } -} - -#[derive(Clone, Debug, PartialEq)] -pub enum VerifiedBlobs { - /// These blobs are available. - Available(BlobSidecarList), - /// This block is from outside the data availability boundary so doesn't require - /// a data availability check. - NotRequired, - /// The block's `kzg_commitments` field is empty so it does not contain any blobs. - EmptyBlobs, - /// This is a block prior to the 4844 fork, so doesn't require any blobs - PreDeneb, -} - -impl VerifiedBlobs { - pub fn to_blobs(self) -> Option> { - match self { - Self::Available(blobs) => Some(blobs), - Self::NotRequired => None, - Self::EmptyBlobs => None, - Self::PreDeneb => None, - } - } -} - /// A fully available block that is ready to be imported into fork choice. #[derive(Clone, Debug, PartialEq)] pub struct AvailableBlock { block: Arc>, - blobs: VerifiedBlobs, + blobs: Option>, } impl AvailableBlock { pub fn block(&self) -> &SignedBeaconBlock { &self.block } - - pub fn da_check_required(&self) -> bool { - match self.blobs { - VerifiedBlobs::PreDeneb | VerifiedBlobs::NotRequired => false, - VerifiedBlobs::EmptyBlobs | VerifiedBlobs::Available(_) => true, - } - } - - pub fn deconstruct(self) -> (Arc>, Option>) { - match self.blobs { - VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreDeneb => { - (self.block, None) - } - VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)), - } - } - - pub fn blobs(&self) -> Option<&BlobSidecarList> { - match &self.blobs { - VerifiedBlobs::Available(blobs) => Some(blobs), - _ => None, - } - } -} - -impl AsBlock for AvailableBlock { - fn slot(&self) -> Slot { - self.block.slot() - } - - fn epoch(&self) -> Epoch { - self.block.epoch() - } - - fn parent_root(&self) -> Hash256 { - self.block.parent_root() - } - - fn state_root(&self) -> Hash256 { - self.block.state_root() - } - - fn signed_block_header(&self) -> SignedBeaconBlockHeader { - self.block.signed_block_header() - } - - fn message(&self) -> BeaconBlockRef { - self.block.message() - } - - fn as_block(&self) -> &SignedBeaconBlock { - &self.block - } - - fn block_cloned(&self) -> Arc> { + pub fn block_cloned(&self) -> Arc> { self.block.clone() } - fn canonical_root(&self) -> Hash256 { - self.block.canonical_root() + pub fn blobs(&self) -> Option<&BlobSidecarList> { + self.blobs.as_ref() } - fn into_block_wrapper(self) -> BlockWrapper { - let (block, blobs_opt) = self.deconstruct(); - if let Some(blobs) = blobs_opt { - let blobs_vec = blobs.iter().cloned().map(Option::Some).collect::>(); - BlockWrapper::BlockAndBlobs(block, FixedVector::from(blobs_vec)) - } else { - BlockWrapper::Block(block) - } + pub fn deconstruct(self) -> (Arc>, Option>) { + let AvailableBlock { block, blobs } = self; + (block, blobs) } } -// The standard implementation of Encode for SignedBeaconBlock -// requires us to use ssz(enum_behaviour = "transparent"). This -// prevents us from implementing Decode. We need to use a -// custom Encode and Decode in this wrapper object that essentially -// encodes it as if it were ssz(enum_behaviour = "union") -impl ssz::Encode for AvailabilityPendingBlock { - fn is_ssz_fixed_len() -> bool { - ssz_tagged_signed_beacon_block::encode::is_ssz_fixed_len() - } - - fn ssz_append(&self, buf: &mut Vec) { - ssz_tagged_signed_beacon_block::encode::ssz_append(self.block.as_ref(), buf); - } - - fn ssz_bytes_len(&self) -> usize { - ssz_tagged_signed_beacon_block::encode::ssz_bytes_len(self.block.as_ref()) - } -} - -impl ssz::Decode for AvailabilityPendingBlock { - fn is_ssz_fixed_len() -> bool { - ssz_tagged_signed_beacon_block::decode::is_ssz_fixed_len() - } - - fn from_ssz_bytes(bytes: &[u8]) -> Result { - Ok(Self { - block: Arc::new(ssz_tagged_signed_beacon_block::decode::from_ssz_bytes( - bytes, - )?), - }) - } -} - -#[cfg(test)] -mod test { - #[test] - fn check_encode_decode_availability_pending_block() { - // todo.. (difficult to create default beacon blocks to test) - } +#[derive(Debug, Clone)] +pub enum MaybeAvailableBlock { + /// This variant is fully available. + /// i.e. for pre-deneb blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for + /// post-4844 blocks, it contains a `SignedBeaconBlock` and a Blobs variant other than `Blobs::None`. + Available(AvailableBlock), + /// This variant is not fully available and requires blobs to become fully available. + AvailabilityPending(Arc>), } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index cfc2f9576..e08f1994d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -29,8 +29,10 @@ use crate::beacon_chain::BeaconStore; use crate::blob_verification::KzgVerifiedBlob; -use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; -use crate::data_availability_checker::{Availability, AvailabilityCheckError}; +use crate::block_verification_types::{ + AsBlock, AvailabilityPendingExecutedBlock, AvailableExecutedBlock, +}; +use crate::data_availability_checker::{make_available, Availability, AvailabilityCheckError}; use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; use lru::LruCache; @@ -102,7 +104,7 @@ impl PendingComponents { pub fn epoch(&self) -> Option { self.executed_block .as_ref() - .map(|pending_block| pending_block.block.as_block().epoch()) + .map(|pending_block| pending_block.block.epoch()) .or_else(|| { for maybe_blob in self.verified_blobs.iter() { if maybe_blob.is_some() { @@ -119,7 +121,7 @@ impl PendingComponents { let block_opt = self .executed_block .as_ref() - .map(|block| block.block.block.clone()); + .map(|block| block.block.clone()); let blobs = self .verified_blobs .iter() @@ -538,7 +540,7 @@ impl OverflowLRUCache { import_data, payload_verification_outcome, } = executed_block; - let available_block = block.make_available(vec![])?; + let available_block = make_available(block, vec![])?; return Ok(Availability::Available(Box::new( AvailableExecutedBlock::new( available_block, @@ -588,7 +590,7 @@ impl OverflowLRUCache { return Ok(Availability::MissingComponents(import_data.block_root)) }; - let available_block = block.make_available(verified_blobs)?; + let available_block = make_available(block, verified_blobs)?; Ok(Availability::Available(Box::new( AvailableExecutedBlock::new( available_block, @@ -758,7 +760,6 @@ impl OverflowLRUCache { value_bytes.as_slice(), )? .block - .as_block() .epoch() } OverflowKey::Blob(_, _) => { @@ -853,8 +854,8 @@ mod test { blob_verification::{ validate_blob_sidecar_for_gossip, verify_kzg_for_blob, GossipVerifiedBlob, }, - block_verification::{BlockImportData, PayloadVerificationOutcome}, - data_availability_checker::AvailabilityPendingBlock, + block_verification::PayloadVerificationOutcome, + block_verification_types::BlockImportData, eth1_finalization_cache::Eth1FinalizationData, test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType}, }; @@ -1129,10 +1130,6 @@ mod test { }; let slot = block.slot(); - let apb: AvailabilityPendingBlock = AvailabilityPendingBlock { - block: Arc::new(block), - }; - let consensus_context = ConsensusContext::::new(slot); let import_data: BlockImportData = BlockImportData { block_root, @@ -1149,7 +1146,7 @@ mod test { }; let availability_pending_block = AvailabilityPendingExecutedBlock { - block: apb, + block: Arc::new(block), import_data, payload_verification_outcome, }; @@ -1301,7 +1298,7 @@ mod test { // we need blocks with blobs continue; } - let root = pending_block.block.block.canonical_root(); + let root = pending_block.block.canonical_root(); pending_blocks.push_back(pending_block); pending_blobs.push_back(blobs); roots.push_back(root); @@ -1462,7 +1459,7 @@ mod test { // we need blocks with blobs continue; } - let root = pending_block.block.as_block().canonical_root(); + let root = pending_block.block.canonical_root(); let epoch = pending_block .block .as_block() diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 3f1ad7581..3410196c8 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -12,6 +12,7 @@ pub mod blob_verification; pub mod block_reward; mod block_times_cache; mod block_verification; +pub mod block_verification_types; pub mod builder; pub mod canonical_head; pub mod capella_readiness; @@ -69,10 +70,12 @@ pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{ - get_block_root, AvailabilityPendingExecutedBlock, BlockError, ExecutedBlock, - ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, - IntoGossipVerifiedBlockContents, PayloadVerificationOutcome, PayloadVerificationStatus, + get_block_root, BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock, + IntoExecutionPendingBlock, IntoGossipVerifiedBlockContents, PayloadVerificationOutcome, + PayloadVerificationStatus, }; +pub use block_verification_types::AvailabilityPendingExecutedBlock; +pub use block_verification_types::ExecutedBlock; pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; pub use events::ServerSentEventHandler; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 147a0c3ba..286aa0c0a 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,4 +1,4 @@ -use crate::blob_verification::{AsBlock, BlockWrapper}; +use crate::block_verification_types::{AsBlock, RpcBlock}; use crate::observed_operations::ObservationOutcome; pub use crate::persisted_beacon_chain::PersistedBeaconChain; pub use crate::{ @@ -694,18 +694,18 @@ where .execution_block_generator() } - pub fn get_head_block(&self) -> BlockWrapper { + pub fn get_head_block(&self) -> RpcBlock { let block = self.chain.head_beacon_block(); let block_root = block.canonical_root(); let blobs = self.chain.get_blobs(&block_root).unwrap(); - BlockWrapper::new(block, blobs) + RpcBlock::new(block, blobs).unwrap() } - pub fn get_full_block(&self, block_root: &Hash256) -> BlockWrapper { + pub fn get_full_block(&self, block_root: &Hash256) -> RpcBlock { let block = self.chain.get_blinded_block(block_root).unwrap().unwrap(); let full_block = self.chain.store.make_full_block(block_root, block).unwrap(); let blobs = self.chain.get_blobs(block_root).unwrap(); - BlockWrapper::new(Arc::new(full_block), blobs) + RpcBlock::new(Arc::new(full_block), blobs).unwrap() } pub fn get_all_validators(&self) -> Vec { @@ -1873,18 +1873,31 @@ where (deposits, state) } - pub async fn process_block>>( + pub async fn process_block( &self, slot: Slot, block_root: Hash256, - block: B, + block_contents: BlockContentsTuple>, ) -> Result> { self.set_current_slot(slot); + let (block, blobs) = block_contents; + // Note: we are just dropping signatures here and skipping signature verification. + let blobs_without_signatures = blobs.as_ref().map(|blobs| { + VariableList::from( + blobs + .into_iter() + .map(|blob| blob.message.clone()) + .collect::>(), + ) + }); let block_hash: SignedBeaconBlockHash = self .chain - .process_block(block_root, block.into(), NotifyExecutionLayer::Yes, || { - Ok(()) - }) + .process_block( + block_root, + RpcBlock::new(Arc::new(block), blobs_without_signatures).unwrap(), + NotifyExecutionLayer::Yes, + || Ok(()), + ) .await? .try_into() .unwrap(); @@ -1892,16 +1905,25 @@ where Ok(block_hash) } - pub async fn process_block_result>>( + pub async fn process_block_result( &self, - block: B, + block_contents: BlockContentsTuple>, ) -> Result> { - let wrapped_block = block.into(); + let (block, blobs) = block_contents; + // Note: we are just dropping signatures here and skipping signature verification. + let blobs_without_signatures = blobs.as_ref().map(|blobs| { + VariableList::from( + blobs + .into_iter() + .map(|blob| blob.message.clone()) + .collect::>(), + ) + }); let block_hash: SignedBeaconBlockHash = self .chain .process_block( - wrapped_block.canonical_root(), - wrapped_block, + block.canonical_root(), + RpcBlock::new(Arc::new(block), blobs_without_signatures).unwrap(), NotifyExecutionLayer::Yes, || Ok(()), ) @@ -1976,11 +1998,16 @@ where BlockError, > { self.set_current_slot(slot); - let (block, new_state) = self.make_block(state, slot).await; + let (block_contents, new_state) = self.make_block(state, slot).await; + let block_hash = self - .process_block(slot, block.0.canonical_root(), block.clone()) + .process_block( + slot, + block_contents.0.canonical_root(), + block_contents.clone(), + ) .await?; - Ok((block_hash, block, new_state)) + Ok((block_hash, block_contents, new_state)) } pub fn attest_block( diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 97122c000..907e7a40b 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -1,6 +1,6 @@ #![cfg(not(debug_assertions))] -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; use beacon_chain::{StateSkipConfig, WhenSlotSkipped}; use lazy_static::lazy_static; @@ -133,11 +133,11 @@ async fn produces_attestations() { assert_eq!(data.target.epoch, state.current_epoch(), "bad target epoch"); assert_eq!(data.target.root, target_root, "bad target root"); - let block_wrapper = - BlockWrapper::::new(Arc::new(block.clone()), blobs.clone()); - let beacon_chain::blob_verification::MaybeAvailableBlock::Available(available_block) = chain + let rpc_block = + RpcBlock::::new(Arc::new(block.clone()), blobs.clone()).unwrap(); + let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available(available_block) = chain .data_availability_checker - .check_availability(block_wrapper) + .check_rpc_block_availability(rpc_block) .unwrap() else { panic!("block should be available") @@ -209,10 +209,10 @@ async fn early_attester_cache_old_request() { .get_blobs(&head.beacon_block_root) .expect("should get blobs"); - let block_wrapper = BlockWrapper::::new(head.beacon_block.clone(), head_blobs); - let beacon_chain::blob_verification::MaybeAvailableBlock::Available(available_block) = harness.chain + let rpc_block = RpcBlock::::new(head.beacon_block.clone(), head_blobs).unwrap(); + let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available(available_block) = harness.chain .data_availability_checker - .check_availability(block_wrapper) + .check_rpc_block_availability(rpc_block) .unwrap() else { panic!("block should be available") diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 374792364..102707a38 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,12 +1,10 @@ #![cfg(not(debug_assertions))] -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; use beacon_chain::test_utils::BlobSignatureKey; use beacon_chain::{ - blob_verification::AsBlock, test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, - AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, ExecutedBlock, - ExecutionPendingBlock, + AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, ExecutionPendingBlock, }; use beacon_chain::{ BeaconSnapshot, BlockError, ChainSegmentResult, IntoExecutionPendingBlock, NotifyExecutionLayer, @@ -156,11 +154,13 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness], blobs: &[Option>], -) -> Vec> { +) -> Vec> { chain_segment .iter() .zip(blobs.into_iter()) - .map(|(snapshot, blobs)| BlockWrapper::new(snapshot.beacon_block.clone(), blobs.clone())) + .map(|(snapshot, blobs)| { + RpcBlock::new(snapshot.beacon_block.clone(), blobs.clone()).unwrap() + }) .collect() } @@ -217,7 +217,7 @@ fn update_parent_roots(snapshots: &mut [BeaconSnapshot]) { async fn chain_segment_full_segment() { let harness = get_harness(VALIDATOR_COUNT); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) + let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) .into_iter() .map(|block| block.into()) .collect(); @@ -256,11 +256,10 @@ async fn chain_segment_varying_chunk_size() { for chunk_size in &[1, 2, 3, 5, 31, 32, 33, 42] { let harness = get_harness(VALIDATOR_COUNT); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = - chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .map(|block| block.into()) - .collect(); + let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) + .into_iter() + .map(|block| block.into()) + .collect(); harness .chain @@ -299,11 +298,10 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a block removed. */ - let mut blocks: Vec> = - chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .map(|block| block.into()) - .collect(); + let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) + .into_iter() + .map(|block| block.into()) + .collect(); blocks.remove(2); assert!( @@ -321,11 +319,10 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a modified parent root. */ - let mut blocks: Vec> = - chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .map(|block| block.into()) - .collect(); + let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) + .into_iter() + .map(|block| block.into()) + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.parent_root_mut() = Hash256::zero(); @@ -357,11 +354,10 @@ async fn chain_segment_non_linear_slots() { * Test where a child is lower than the parent. */ - let mut blocks: Vec> = - chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .map(|block| block.into()) - .collect(); + let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) + .into_iter() + .map(|block| block.into()) + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = Slot::new(0); blocks[3] = Arc::new(SignedBeaconBlock::from_block(block, signature)).into(); @@ -382,11 +378,10 @@ async fn chain_segment_non_linear_slots() { * Test where a child is equal to the parent. */ - let mut blocks: Vec> = - chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .map(|block| block.into()) - .collect(); + let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) + .into_iter() + .map(|block| block.into()) + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = blocks[2].slot(); blocks[3] = Arc::new(SignedBeaconBlock::from_block(block, signature)).into(); @@ -412,10 +407,12 @@ async fn assert_invalid_signature( snapshots: &[BeaconSnapshot], item: &str, ) { - let blocks: Vec> = snapshots + let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| BlockWrapper::new(snapshot.beacon_block.clone(), blobs.clone())) + .map(|(snapshot, blobs)| { + RpcBlock::new(snapshot.beacon_block.clone(), blobs.clone()).unwrap() + }) .collect(); // Ensure the block will be rejected if imported in a chain segment. @@ -440,7 +437,9 @@ async fn assert_invalid_signature( .iter() .take(block_index) .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| BlockWrapper::new(snapshot.beacon_block.clone(), blobs.clone())) + .map(|(snapshot, blobs)| { + RpcBlock::new(snapshot.beacon_block.clone(), blobs.clone()).unwrap() + }) .collect(); // We don't care if this fails, we just call this to ensure that all prior blocks have been // imported prior to this test. @@ -454,10 +453,11 @@ async fn assert_invalid_signature( .chain .process_block( snapshots[block_index].beacon_block.canonical_root(), - BlockWrapper::new( + RpcBlock::new( snapshots[block_index].beacon_block.clone(), chain_segment_blobs[block_index].clone(), - ), + ) + .unwrap(), NotifyExecutionLayer::Yes, || Ok(()), ) @@ -509,7 +509,7 @@ async fn invalid_signature_gossip_block() { .take(block_index) .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - BlockWrapper::new(snapshot.beacon_block.clone(), blobs.clone()) + RpcBlock::new(snapshot.beacon_block.clone(), blobs.clone()).unwrap() }) .collect(); harness @@ -552,11 +552,11 @@ async fn invalid_signature_block_proposal() { block.clone(), junk_signature(), )); - let blocks: Vec> = snapshots + let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - BlockWrapper::new(snapshot.beacon_block.clone(), blobs.clone()) + RpcBlock::new(snapshot.beacon_block.clone(), blobs.clone()).unwrap() }) .collect::>(); // Ensure the block will be rejected if imported in a chain segment. @@ -765,11 +765,11 @@ async fn invalid_signature_deposit() { Arc::new(SignedBeaconBlock::from_block(block, signature)); update_parent_roots(&mut snapshots); update_proposal_signatures(&mut snapshots, &harness); - let blocks: Vec> = snapshots + let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - BlockWrapper::new(snapshot.beacon_block.clone(), blobs.clone()) + RpcBlock::new(snapshot.beacon_block.clone(), blobs.clone()).unwrap() }) .collect(); assert!( diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 60ba8f288..fb7cc516f 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -221,7 +221,7 @@ impl InvalidPayloadRig { let head = self.harness.chain.head_snapshot(); let state = head.beacon_state.clone_with_only_committee_caches(); let slot = slot_override.unwrap_or(state.slot() + 1); - let ((block, _), post_state) = self.harness.make_block(state, slot).await; + let ((block, blobs), post_state) = self.harness.make_block(state, slot).await; let block_root = block.canonical_root(); let set_new_payload = |payload: Payload| match payload { @@ -285,7 +285,7 @@ impl InvalidPayloadRig { } let root = self .harness - .process_block(slot, block.canonical_root(), block.clone()) + .process_block(slot, block.canonical_root(), (block.clone(), blobs.clone())) .await .unwrap(); @@ -326,7 +326,7 @@ impl InvalidPayloadRig { match self .harness - .process_block(slot, block.canonical_root(), block) + .process_block(slot, block.canonical_root(), (block, blobs)) .await { Err(error) if evaluate_error(&error) => (), diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 7a86b5f93..18f1cbd7c 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1,7 +1,7 @@ #![cfg(not(debug_assertions))] use beacon_chain::attestation_verification::Error as AttnError; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::builder::BeaconChainBuilder; use beacon_chain::schema_change::migrate_schema; use beacon_chain::test_utils::{ @@ -10,7 +10,7 @@ use beacon_chain::test_utils::{ }; use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use beacon_chain::{ - blob_verification::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, + data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, }; @@ -2039,7 +2039,10 @@ async fn garbage_collect_temp_states_from_failed_block() { // The block should be rejected, but should store a bunch of temporary states. harness.set_current_slot(block_slot); - harness.process_block_result(block).await.unwrap_err(); + harness + .process_block_result((block, None)) + .await + .unwrap_err(); assert_eq!( store.iter_temporary_state_roots().count(), @@ -2176,7 +2179,7 @@ async fn weak_subjectivity_sync() { beacon_chain .process_block( full_block.canonical_root(), - BlockWrapper::new(Arc::new(full_block), blobs), + RpcBlock::new(Arc::new(full_block), blobs).unwrap(), NotifyExecutionLayer::Yes, || Ok(()), ) @@ -2236,7 +2239,7 @@ async fn weak_subjectivity_sync() { if let MaybeAvailableBlock::Available(block) = harness .chain .data_availability_checker - .check_availability(BlockWrapper::new(Arc::new(full_block), blobs)) + .check_rpc_block_availability(RpcBlock::new(Arc::new(full_block), blobs).unwrap()) .expect("should check availability") { available_blocks.push(block); @@ -2456,14 +2459,14 @@ async fn revert_minority_fork_on_resume() { harness1.process_attestations(attestations.clone()); harness2.process_attestations(attestations); - let ((block, _), new_state) = harness1.make_block(state, slot).await; + let ((block, blobs), new_state) = harness1.make_block(state, slot).await; harness1 - .process_block(slot, block.canonical_root(), block.clone()) + .process_block(slot, block.canonical_root(), (block.clone(), blobs.clone())) .await .unwrap(); harness2 - .process_block(slot, block.canonical_root(), block.clone()) + .process_block(slot, block.canonical_root(), (block.clone(), blobs.clone())) .await .unwrap(); @@ -2497,17 +2500,17 @@ async fn revert_minority_fork_on_resume() { harness2.process_attestations(attestations); // Minority chain block (no attesters). - let ((block1, _), new_state1) = harness1.make_block(state1, slot).await; + let ((block1, blobs1), new_state1) = harness1.make_block(state1, slot).await; harness1 - .process_block(slot, block1.canonical_root(), block1) + .process_block(slot, block1.canonical_root(), (block1, blobs1)) .await .unwrap(); state1 = new_state1; // Majority chain block (all attesters). - let ((block2, _), new_state2) = harness2.make_block(state2, slot).await; + let ((block2, blobs2), new_state2) = harness2.make_block(state2, slot).await; harness2 - .process_block(slot, block2.canonical_root(), block2.clone()) + .process_block(slot, block2.canonical_root(), (block2.clone(), blobs2)) .await .unwrap(); @@ -2560,7 +2563,7 @@ async fn revert_minority_fork_on_resume() { let initial_split_slot = resumed_harness.chain.store.get_split_slot(); for block in &majority_blocks { resumed_harness - .process_block_result(block.clone()) + .process_block_result((block.clone(), None)) .await .unwrap(); diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 542516081..38c40b890 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,6 +1,6 @@ use crate::metrics; -use beacon_chain::blob_verification::AsBlock; +use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 40f21f727..f27a4d951 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -636,7 +636,7 @@ pub async fn proposer_boost_re_org_test( // Applying block C should cause it to become head regardless (re-org or continuation). let block_root_c = harness - .process_block_result(block_c.clone()) + .process_block_result((block_c.clone(), None)) .await .unwrap() .into(); diff --git a/beacon_node/http_api/tests/status_tests.rs b/beacon_node/http_api/tests/status_tests.rs index 664c5f2ea..b3d1e9daa 100644 --- a/beacon_node/http_api/tests/status_tests.rs +++ b/beacon_node/http_api/tests/status_tests.rs @@ -124,7 +124,7 @@ async fn el_error_on_new_payload() { // Attempt to process the block, which should error. harness.advance_slot(); assert!(matches!( - harness.process_block_result(block.clone()).await, + harness.process_block_result((block.clone(), None)).await, Err(BlockError::ExecutionPayloadError(_)) )); @@ -143,7 +143,7 @@ async fn el_error_on_new_payload() { validation_error: None, }, ); - harness.process_block_result(block).await.unwrap(); + harness.process_block_result((block, None)).await.unwrap(); let api_response = tester.client.get_node_syncing().await.unwrap().data; assert_eq!(api_response.el_offline, Some(false)); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 000c4d85d..2255b4017 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -5,8 +5,8 @@ use crate::{ sync::SyncMessage, }; -use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::{BlobError, GossipVerifiedBlob}; +use beacon_chain::block_verification_types::AsBlock; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 8e44aba15..4a214c363 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -2,7 +2,7 @@ use crate::{ service::NetworkMessage, sync::{manager::BlockProcessType, SyncMessage}, }; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain, }; @@ -409,7 +409,7 @@ impl NetworkBeaconProcessor { pub fn send_rpc_beacon_block( self: &Arc, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Result<(), Error> { @@ -450,7 +450,7 @@ impl NetworkBeaconProcessor { pub fn send_chain_segment( self: &Arc, process_id: ChainSegmentProcessId, - blocks: Vec>, + blocks: Vec>, ) -> Result<(), Error> { let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); let processor = self.clone(); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8d9146e68..b21bc6abd 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -6,8 +6,9 @@ use crate::sync::{ manager::{BlockProcessType, SyncMessage}, ChainId, }; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper, MaybeAvailableBlock}; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; +use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::{ observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, @@ -54,7 +55,7 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_beacon_block_process_fn( self: Arc, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> AsyncFn { @@ -78,7 +79,7 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_beacon_block_fns( self: Arc, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> (AsyncFn, BlockingFn) { @@ -106,7 +107,7 @@ impl NetworkBeaconProcessor { pub async fn process_rpc_block( self: Arc>, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, reprocess_tx: mpsc::Sender, @@ -315,7 +316,7 @@ impl NetworkBeaconProcessor { pub async fn process_chain_segment( &self, sync_type: ChainSegmentProcessId, - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, notify_execution_layer: NotifyExecutionLayer, ) { let result = match sync_type { @@ -440,7 +441,7 @@ impl NetworkBeaconProcessor { /// Helper function to process blocks batches which only consumes the chain and blocks to process. async fn process_blocks<'a>( &self, - downloaded_blocks: impl Iterator>, + downloaded_blocks: impl Iterator>, notify_execution_layer: NotifyExecutionLayer, ) -> (usize, Result<(), ChainSegmentFailed>) { let blocks: Vec<_> = downloaded_blocks.cloned().collect(); @@ -473,7 +474,7 @@ impl NetworkBeaconProcessor { /// Helper function to process backfill block batches which only consumes the chain and blocks to process. fn process_backfill_blocks( &self, - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); let available_blocks = match downloaded_blocks @@ -481,7 +482,7 @@ impl NetworkBeaconProcessor { .map(|block| { self.chain .data_availability_checker - .check_availability(block) + .check_rpc_block_availability(block) }) .collect::, _>>() { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index e5a142800..0900e034f 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -14,7 +14,7 @@ use crate::sync::network_context::SyncNetworkContext; use crate::sync::range_sync::{ BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, }; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; @@ -55,7 +55,7 @@ impl BatchConfig for BackFillBatchConfig { fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } - fn batch_attempt_hash(blocks: &[BlockWrapper]) -> u64 { + fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64 { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; let mut hasher = DefaultHasher::new(); @@ -392,7 +392,7 @@ impl BackFillSync { batch_id: BatchId, peer_id: &PeerId, request_id: Id, - beacon_block: Option>, + beacon_block: Option>, ) -> Result { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index dfe960832..ff095c719 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -10,7 +10,7 @@ use super::{ use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_lookups::single_block_lookup::LookupId; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use lighthouse_network::rpc::RPCError; @@ -34,7 +34,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type DownloadedBlocks = (Hash256, BlockWrapper); +pub type DownloadedBlocks = (Hash256, RpcBlock); pub type RootBlockTuple = (Hash256, Arc>); pub type RootBlobsTuple = (Hash256, FixedBlobSidecarList); @@ -381,7 +381,7 @@ impl BlockLookups { if !has_pending_parent_request { let rpc_block = request_ref .get_downloaded_block() - .unwrap_or(BlockWrapper::Block(block)); + .unwrap_or(RpcBlock::new_without_blobs(block)); // This is the correct block, send it for processing match self.send_block_for_processing( block_root, @@ -910,11 +910,7 @@ impl BlockLookups { BlockError::ParentUnknown(block) => { let slot = block.slot(); let parent_root = block.parent_root(); - let (block, blobs) = block.deconstruct(); - request_ref.add_unknown_parent_components(UnknownParentComponents::new( - Some(block), - blobs, - )); + request_ref.add_unknown_parent_components(block.into()); self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); ShouldRemoveLookup::False } @@ -1226,7 +1222,7 @@ impl BlockLookups { fn send_block_for_processing( &mut self, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, duration: Duration, process_type: BlockProcessType, cx: &mut SyncNetworkContext, diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 5175450d9..6d870b5ab 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -3,8 +3,8 @@ use super::{BlobRequestId, BlockRequestId, DownloadedBlocks, PeerShouldHave, Res use crate::sync::block_lookups::single_block_lookup::{State, UnknownParentComponents}; use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple}; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; -use beacon_chain::blob_verification::AsBlock; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::AsBlock; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerId; @@ -147,7 +147,7 @@ impl ParentLookup { .check_peer_disconnected(peer_id) } - pub fn add_unknown_parent_block(&mut self, block: BlockWrapper) { + pub fn add_unknown_parent_block(&mut self, block: RpcBlock) { let next_parent = block.parent_root(); // Cache the block. @@ -203,7 +203,7 @@ impl ParentLookup { self, ) -> ( Hash256, - Vec>, + Vec>, Vec, SingleBlockLookup, ) { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b6e0cef83..90829905b 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,6 +1,6 @@ use crate::sync::block_lookups::{BlobRequestId, BlockRequestId, RootBlobsTuple, RootBlockTuple}; use crate::sync::network_context::SyncNetworkContext; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::{get_block_root, BeaconChainTypes}; use lighthouse_network::rpc::methods::BlobsByRootRequest; @@ -138,6 +138,16 @@ pub struct UnknownParentComponents { pub downloaded_blobs: FixedBlobSidecarList, } +impl From> for UnknownParentComponents { + fn from(value: RpcBlock) -> Self { + let (block, blobs) = value.deconstruct(); + let fixed_blobs = blobs.map(|blobs| { + FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::>()) + }); + Self::new(Some(block), fixed_blobs) + } +} + impl UnknownParentComponents { pub fn new( block: Option>>, @@ -284,7 +294,7 @@ impl SingleBlockLookup Option> { + pub fn get_downloaded_block(&mut self) -> Option> { self.unknown_parent_components .as_mut() .and_then(|components| { @@ -302,8 +312,16 @@ impl SingleBlockLookup>(); + let blobs = VariableList::from(filtered); + RpcBlock::new(block.clone(), Some(blobs)).ok() }) } else { None diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 80b0f3c79..2d523b048 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1474,7 +1474,7 @@ mod deneb_only { fn parent_block_unknown_parent(mut self) -> Self { self.bl.parent_block_processed( self.block_root, - BlockProcessingResult::Err(BlockError::ParentUnknown(BlockWrapper::Block( + BlockProcessingResult::Err(BlockError::ParentUnknown(RpcBlock::new_without_blobs( self.parent_block.clone().expect("parent block"), ))), ResponseType::Block, diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 7e5362a6f..fce7a2e30 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,5 +1,5 @@ -use beacon_chain::blob_verification::BlockWrapper; -use ssz_types::FixedVector; +use beacon_chain::block_verification_types::RpcBlock; +use ssz_types::VariableList; use std::{collections::VecDeque, sync::Arc}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; @@ -16,28 +16,28 @@ pub struct BlocksAndBlobsRequestInfo { } impl BlocksAndBlobsRequestInfo { - pub fn add_block_response(&mut self, maybe_block: Option>>) { - match maybe_block { + pub fn add_block_response(&mut self, block_opt: Option>>) { + match block_opt { Some(block) => self.accumulated_blocks.push_back(block), None => self.is_blocks_stream_terminated = true, } } - pub fn add_sidecar_response(&mut self, maybe_sidecar: Option>>) { - match maybe_sidecar { + pub fn add_sidecar_response(&mut self, sidecar_opt: Option>>) { + match sidecar_opt { Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), None => self.is_sidecars_stream_terminated = true, } } - pub fn into_responses(self) -> Result>, &'static str> { + pub fn into_responses(self) -> Result>, &'static str> { let BlocksAndBlobsRequestInfo { accumulated_blocks, accumulated_sidecars, .. } = self; - // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty + // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. let mut responses = Vec::with_capacity(accumulated_blocks.len()); let mut blob_iter = accumulated_sidecars.into_iter().peekable(); @@ -50,29 +50,23 @@ impl BlocksAndBlobsRequestInfo { .unwrap_or(false); pair_next_blob } { - blob_list.push(blob_iter.next().expect("iterator is not empty")); + blob_list.push(blob_iter.next().ok_or("Missing next blob")?); } - if blob_list.is_empty() { - responses.push(BlockWrapper::Block(block)) - } else { - let mut blobs_fixed = vec![None; T::max_blobs_per_block()]; - for blob in blob_list { - let blob_index = blob.index as usize; - let Some(blob_opt) = blobs_fixed.get_mut(blob_index) else { + let mut blobs_buffer = vec![None; T::max_blobs_per_block()]; + for blob in blob_list { + let blob_index = blob.index as usize; + let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { return Err("Invalid blob index"); }; - if blob_opt.is_some() { - return Err("Repeat blob index"); - } else { - *blob_opt = Some(blob); - } + if blob_opt.is_some() { + return Err("Repeat blob index"); + } else { + *blob_opt = Some(blob); } - responses.push(BlockWrapper::BlockAndBlobs( - block, - FixedVector::from(blobs_fixed), - )) } + let blobs = VariableList::from(blobs_buffer.into_iter().flatten().collect::>()); + responses.push(RpcBlock::new(block, Some(blobs))?) } // if accumulated sidecars is not empty, throw an error. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 93b7c9af5..5e8fc4a4e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -46,8 +46,8 @@ use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage; pub use crate::sync::block_lookups::ResponseType; use crate::sync::block_lookups::UnknownParentComponents; use crate::sync::range_sync::ByRangeRequestType; -use beacon_chain::blob_verification::AsBlock; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::AsBlock; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, MAXIMUM_GOSSIP_CLOCK_DISPARITY, @@ -127,7 +127,7 @@ pub enum SyncMessage { }, /// A block with an unknown parent has been received. - UnknownParentBlock(PeerId, BlockWrapper, Hash256), + UnknownParentBlock(PeerId, RpcBlock, Hash256), /// A blob with an unknown parent has been received. UnknownParentBlob(PeerId, Arc>), @@ -614,15 +614,13 @@ impl SyncManager { } => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); - let (block, blobs) = block.deconstruct(); let parent_root = block.parent_root(); - let parent_components = UnknownParentComponents::new(Some(block), blobs); self.handle_unknown_parent( peer_id, block_root, parent_root, block_slot, - Some(parent_components), + Some(block.into()), ); } SyncMessage::UnknownParentBlob(peer_id, blob) => { @@ -910,7 +908,7 @@ impl SyncManager { batch_id, &peer_id, id, - block.map(BlockWrapper::Block), + block.map(Into::into), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -934,7 +932,7 @@ impl SyncManager { chain_id, batch_id, id, - block.map(BlockWrapper::Block), + block.map(Into::into), ); self.update_sync_state(); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 7c162f478..d635dd2ea 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -8,7 +8,7 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{BlobRequestId, BlockRequestId}; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; @@ -22,7 +22,7 @@ use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; pub struct BlocksAndBlobsByRangeResponse { pub batch_id: BatchId, - pub responses: Result>, &'static str>, + pub responses: Result>, &'static str>, } pub struct BlocksAndBlobsByRangeRequest { diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 138d32096..f5c320cb8 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,5 +1,5 @@ use crate::sync::manager::Id; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::PeerId; use std::collections::HashSet; @@ -56,7 +56,7 @@ pub trait BatchConfig { /// Note that simpler hashing functions considered in the past (hash of first block, hash of last /// block, number of received blocks) are not good enough to differentiate attempts. For this /// reason, we hash the complete set of blocks both in RangeSync and BackFillSync. - fn batch_attempt_hash(blocks: &[BlockWrapper]) -> u64; + fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64; } pub struct RangeSyncBatchConfig {} @@ -68,7 +68,7 @@ impl BatchConfig for RangeSyncBatchConfig { fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } - fn batch_attempt_hash(blocks: &[BlockWrapper]) -> u64 { + fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64 { let mut hasher = std::collections::hash_map::DefaultHasher::new(); blocks.hash(&mut hasher); hasher.finish() @@ -116,9 +116,9 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Vec>, Id), + Downloading(PeerId, Vec>, Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>), + AwaitingProcessing(PeerId, Vec>), /// The batch is being processed. Processing(Attempt), /// The batch was successfully processed and is waiting to be validated. @@ -251,7 +251,7 @@ impl BatchInfo { } /// Adds a block to a downloading batch. - pub fn add_block(&mut self, block: BlockWrapper) -> Result<(), WrongState> { + pub fn add_block(&mut self, block: RpcBlock) -> Result<(), WrongState> { match self.state.poison() { BatchState::Downloading(peer, mut blocks, req_id) => { blocks.push(block); @@ -383,7 +383,7 @@ impl BatchInfo { } } - pub fn start_processing(&mut self) -> Result>, WrongState> { + pub fn start_processing(&mut self) -> Result>, WrongState> { match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks) => { self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); @@ -481,7 +481,7 @@ pub struct Attempt { } impl Attempt { - fn new(peer_id: PeerId, blocks: &[BlockWrapper]) -> Self { + fn new(peer_id: PeerId, blocks: &[RpcBlock]) -> Self { let hash = B::batch_attempt_hash(blocks); Attempt { peer_id, hash } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 3b3cdb6ae..4d399b5cb 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -3,7 +3,7 @@ use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::{ manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult, }; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; @@ -221,7 +221,7 @@ impl SyncingChain { batch_id: BatchId, peer_id: &PeerId, request_id: Id, - beacon_block: Option>, + beacon_block: Option>, ) -> ProcessingResult { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 28cdc7afc..4ca518f98 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -47,7 +47,7 @@ use crate::status::ToStatusMessage; use crate::sync::manager::Id; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::rpc::GoodbyeReason; use lighthouse_network::PeerId; @@ -210,7 +210,7 @@ where chain_id: ChainId, batch_id: BatchId, request_id: Id, - beacon_block: Option>, + beacon_block: Option>, ) { // check if this chunk removes the chain match self.chains.call_by_id(chain_id, |chain| { @@ -387,19 +387,18 @@ mod tests { use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::parking_lot::RwLock; + use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use beacon_chain::EngineState; use beacon_processor::WorkEvent as BeaconWorkEvent; use lighthouse_network::rpc::BlocksByRangeRequest; use lighthouse_network::Request; use lighthouse_network::{rpc::StatusMessage, NetworkGlobals}; use slog::{o, Drain}; - use tokio::sync::mpsc; - - use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use slot_clock::TestingSlotClock; use std::collections::HashSet; use std::sync::Arc; use store::MemoryStore; + use tokio::sync::mpsc; use types::{Hash256, MinimalEthSpec as E}; #[derive(Debug)] diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 88fb7d896..9d39eb3e3 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -1,9 +1,5 @@ #![cfg(not(debug_assertions))] -use std::fmt; -use std::sync::Mutex; -use std::time::Duration; - use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; @@ -14,6 +10,9 @@ use beacon_chain::{ use fork_choice::{ ForkChoiceStore, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, QueuedAttestation, }; +use std::fmt; +use std::sync::Mutex; +use std::time::Duration; use store::MemoryStore; use types::{ test_utils::generate_deterministic_keypair, BeaconBlockRef, BeaconState, ChainSpec, Checkpoint, @@ -195,17 +194,18 @@ impl ForkChoiceTest { let validators = self.harness.get_all_validators(); loop { let slot = self.harness.get_current_slot(); - let (block, state_) = self.harness.make_block(state, slot).await; + let (block_contents, state_) = self.harness.make_block(state, slot).await; state = state_; - if !predicate(block.0.message(), &state) { + if !predicate(block_contents.0.message(), &state) { break; } - if let Ok(block_hash) = self.harness.process_block_result(block.clone()).await { + let block = block_contents.0.clone(); + if let Ok(block_hash) = self.harness.process_block_result(block_contents).await { self.harness.attest_block( &state, - block.0.state_root(), + block.state_root(), block_hash, - &block.0, + &block, &validators, ); self.harness.advance_slot(); diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 78803ab4e..8e49a0d49 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -21,8 +21,6 @@ pub struct ConsensusContext { #[ssz(skip_serializing, skip_deserializing)] indexed_attestations: HashMap<(AttestationData, BitList), IndexedAttestation>, - /// Whether `verify_kzg_commitments_against_transactions` has successfully passed. - kzg_commitments_consistent: bool, } #[derive(Debug, PartialEq, Clone)] @@ -45,7 +43,6 @@ impl ConsensusContext { proposer_index: None, current_block_root: None, indexed_attestations: HashMap::new(), - kzg_commitments_consistent: false, } } @@ -161,13 +158,4 @@ impl ConsensusContext { pub fn num_cached_indexed_attestations(&self) -> usize { self.indexed_attestations.len() } - - pub fn set_kzg_commitments_consistent(mut self, kzg_commitments_consistent: bool) -> Self { - self.kzg_commitments_consistent = kzg_commitments_consistent; - self - } - - pub fn kzg_commitments_consistent(&self) -> bool { - self.kzg_commitments_consistent - } } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 58b7aaeea..3b067e0ce 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -169,9 +169,10 @@ pub use crate::selection_proof::SelectionProof; pub use crate::shuffling_id::AttestationShufflingId; pub use crate::signed_aggregate_and_proof::SignedAggregateAndProof; pub use crate::signed_beacon_block::{ - ssz_tagged_signed_beacon_block, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockHash, - SignedBeaconBlockMerge, SignedBlindedBeaconBlock, + ssz_tagged_signed_beacon_block, ssz_tagged_signed_beacon_block_arc, SignedBeaconBlock, + SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella, + SignedBeaconBlockDeneb, SignedBeaconBlockHash, SignedBeaconBlockMerge, + SignedBlindedBeaconBlock, }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; pub use crate::signed_blob::*; diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index cf7fd6819..c52ba11d2 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -642,6 +642,27 @@ pub mod ssz_tagged_signed_beacon_block { } } +pub mod ssz_tagged_signed_beacon_block_arc { + use super::*; + pub mod encode { + pub use super::ssz_tagged_signed_beacon_block::encode::*; + } + + pub mod decode { + pub use super::ssz_tagged_signed_beacon_block::decode::{is_ssz_fixed_len, ssz_fixed_len}; + use super::*; + #[allow(unused_imports)] + use ssz::*; + use std::sync::Arc; + + pub fn from_ssz_bytes>( + bytes: &[u8], + ) -> Result>, DecodeError> { + ssz_tagged_signed_beacon_block::decode::from_ssz_bytes(bytes).map(Arc::new) + } + } +} + #[cfg(test)] mod test { use super::*;