diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e5d06b78c..7f857c58d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -104,12 +104,13 @@ use store::{ use task_executor::{ShutdownReason, TaskExecutor}; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; +use types::signed_block_and_blobs::BlockWrapper; use types::*; pub type ForkChoiceError = fork_choice::Error; /// Alias to appease clippy. -type HashBlockTuple = (Hash256, Arc>); +type HashBlockTuple = (Hash256, BlockWrapper); /// The time-out before failure during an operation to take a read/write RwLock on the block /// processing cache. @@ -2278,7 +2279,7 @@ impl BeaconChain { /// This method is potentially long-running and should not run on the core executor. pub fn filter_chain_segment( self: &Arc, - chain_segment: Vec>>, + chain_segment: Vec>, ) -> Result>, ChainSegmentResult> { // This function will never import any blocks. let imported_blocks = 0; @@ -2290,19 +2291,19 @@ impl BeaconChain { let children = chain_segment .iter() .skip(1) - .map(|block| (block.parent_root(), block.slot())) + .map(|block| (block.block().parent_root(), block.slot())) .collect::>(); for (i, block) in chain_segment.into_iter().enumerate() { // Ensure the block is the correct structure for the fork at `block.slot()`. - if let Err(e) = block.fork_name(&self.spec) { + if let Err(e) = block.block().fork_name(&self.spec) { return Err(ChainSegmentResult::Failed { imported_blocks, error: BlockError::InconsistentFork(e), }); } - let block_root = get_block_root(&block); + let block_root = get_block_root(block.block()); if let Some((child_parent_root, child_slot)) = children.get(i) { // If this block has a child in this chain segment, ensure that its parent root matches @@ -2326,7 +2327,7 @@ impl BeaconChain { } } - match check_block_relevancy(&block, block_root, self) { + match check_block_relevancy(block.block(), block_root, self) { // If the block is relevant, add it to the filtered chain segment. Ok(_) => filtered_chain_segment.push((block_root, block)), // If the block is already known, simply ignore this block. @@ -2384,7 +2385,7 @@ impl BeaconChain { /// `Self::process_block`. pub async fn process_chain_segment( self: &Arc, - chain_segment: Vec>>, + chain_segment: Vec>, count_unrealized: CountUnrealized, ) -> ChainSegmentResult { let mut imported_blocks = 0; @@ -5246,6 +5247,14 @@ impl BeaconChain { gossip_attested || block_attested || aggregated || produced_block } + + /// The epoch at which we require a data availability check in block processing. + /// `None` if the `Eip4844` fork is disabled. + pub fn data_availability_boundary(&self) -> Option { + self.spec + .eip4844_fork_epoch + .map(|e| std::cmp::max(e, self.head().finalized_checkpoint().epoch)) + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 036102baf..912dff112 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -87,6 +87,7 @@ use std::time::Duration; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; +use types::signed_block_and_blobs::BlockWrapper; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, @@ -548,17 +549,18 @@ fn process_block_slash_info( /// The given `chain_segment` must contain only blocks from the same epoch, otherwise an error /// will be returned. pub fn signature_verify_chain_segment( - mut chain_segment: Vec<(Hash256, Arc>)>, + mut chain_segment: Vec<(Hash256, BlockWrapper)>, chain: &BeaconChain, ) -> Result>, BlockError> { if chain_segment.is_empty() { return Ok(vec![]); } - let (first_root, first_block) = chain_segment.remove(0); - let (mut parent, first_block) = load_parent(first_root, first_block, chain)?; + let (first_root, first_block_wrapper) = chain_segment.remove(0); + let (mut parent, first_block) = + load_parent(first_root, first_block_wrapper.block_cloned(), chain)?; let slot = first_block.slot(); - chain_segment.insert(0, (first_root, first_block)); + chain_segment.insert(0, (first_root, first_block_wrapper)); let highest_slot = chain_segment .last() @@ -576,7 +578,7 @@ pub fn signature_verify_chain_segment( let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); for (block_root, block) in &chain_segment { - signature_verifier.include_all_signatures(block, Some(*block_root), None)?; + signature_verifier.include_all_signatures(block.block(), Some(*block_root), None)?; } if signature_verifier.verify().is_err() { @@ -585,19 +587,16 @@ pub fn signature_verify_chain_segment( drop(pubkey_cache); - //FIXME(sean) batch verify kzg blobs - // TODO(pawan): do not have the sidecar here, maybe validate kzg proofs in a different function - // with relevant block params? - let mut signature_verified_blocks = chain_segment .into_iter() .map(|(block_root, block)| { // Proposer index has already been verified above during signature verification. let consensus_context = ConsensusContext::new(block.slot()) .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()); + .set_proposer_index(block.block().message().proposer_index()) + .set_blobs_sidecar(block.blocks_sidecar()); SignatureVerifiedBlock { - block, + block: block.block_cloned(), block_root, parent: None, consensus_context, @@ -1103,7 +1102,6 @@ impl IntoExecutionPendingBlock for SignatureVerifiedBloc ExecutionPendingBlock::from_signature_verified_components( block, - self.consensus_context.blobs_sidecar(), block_root, parent, self.consensus_context, @@ -1148,7 +1146,6 @@ impl ExecutionPendingBlock { /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn from_signature_verified_components( block: Arc>, - blobs: Option>>, block_root: Hash256, parent: PreProcessingSnapshot, mut consensus_context: ConsensusContext, @@ -1483,58 +1480,58 @@ impl ExecutionPendingBlock { }); } - // TODO: are we guaranteed that consensus_context.blobs_sidecar == blobs ? - /* Verify kzg proofs and kzg commitments against transactions if required */ - if let Some(ref sidecar) = blobs { - let kzg = chain.kzg.as_ref().ok_or(BlockError::BlobValidation( - BlobError::TrustedSetupNotInitialized, - ))?; - let transactions = block - .message() - .body() - .execution_payload_eip4844() - .map(|payload| payload.transactions()) - .map_err(|_| BlockError::BlobValidation(BlobError::TransactionsMissing))? - .ok_or(BlockError::BlobValidation(BlobError::TransactionsMissing))?; - let kzg_commitments = block - .message() - .body() - .blob_kzg_commitments() - .map_err(|_| BlockError::BlobValidation(BlobError::KzgCommitmentMissing))?; - if !consensus_context.blobs_sidecar_validated() { - if !kzg_utils::validate_blobs_sidecar( - &kzg, - block.slot(), - block_root, - kzg_commitments, - sidecar, - ) - .map_err(|e| BlockError::BlobValidation(BlobError::KzgError(e)))? - { - return Err(BlockError::BlobValidation(BlobError::InvalidKzgProof)); + /* + * Verify kzg proofs and kzg commitments against transactions if required + */ + if let Some(ref sidecar) = consensus_context.blobs_sidecar() { + if let Some(data_availability_boundary) = chain.data_availability_boundary() { + if block_slot.epoch(T::EthSpec::slots_per_epoch()) > data_availability_boundary { + let kzg = chain.kzg.as_ref().ok_or(BlockError::BlobValidation( + BlobError::TrustedSetupNotInitialized, + ))?; + let transactions = block + .message() + .body() + .execution_payload_eip4844() + .map(|payload| payload.transactions()) + .map_err(|_| BlockError::BlobValidation(BlobError::TransactionsMissing))? + .ok_or(BlockError::BlobValidation(BlobError::TransactionsMissing))?; + let kzg_commitments = + block.message().body().blob_kzg_commitments().map_err(|_| { + BlockError::BlobValidation(BlobError::KzgCommitmentMissing) + })?; + if !consensus_context.blobs_sidecar_validated() { + if !kzg_utils::validate_blobs_sidecar( + &kzg, + block.slot(), + block_root, + kzg_commitments, + sidecar, + ) + .map_err(|e| BlockError::BlobValidation(BlobError::KzgError(e)))? + { + return Err(BlockError::BlobValidation(BlobError::InvalidKzgProof)); + } + } + if !consensus_context.blobs_verified_vs_txs() + && verify_kzg_commitments_against_transactions::( + transactions, + kzg_commitments, + ) + //FIXME(sean) we should maybe just map this error so we have more info about the mismatch + .is_err() + { + return Err(BlockError::BlobValidation( + BlobError::TransactionCommitmentMismatch, + )); + } } } - if !consensus_context.blobs_verified_vs_txs() - && verify_kzg_commitments_against_transactions::( - transactions, - kzg_commitments, - ) - .is_err() - { - return Err(BlockError::BlobValidation( - BlobError::TransactionCommitmentMismatch, - )); - } - // TODO(pawan): confirm with sean. are we expected to set the context here? because the ConsensusContext - // setters don't take mutable references. - // Set the consensus context after completing the required kzg valdiations - // consensus_context.set_blobs_sidecar_validated(true); - // consensus_context.set_blobs_verified_vs_txs(true); } Ok(Self { block, - blobs, + blobs: consensus_context.blobs_sidecar(), block_root, state, parent_block: parent.beacon_block, diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index db4e9dc07..718a17ea7 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -61,6 +61,7 @@ use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::mpsc; +use types::signed_block_and_blobs::BlockWrapper; use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBlsToExecutionChange, @@ -1762,8 +1763,13 @@ impl BeaconProcessor { /* * Verification for a chain segment (multiple blocks). */ - Work::ChainSegment { process_id, blocks } => task_spawner - .spawn_async(async move { worker.process_chain_segment(process_id, blocks).await }), + Work::ChainSegment { process_id, blocks } => task_spawner.spawn_async(async move { + let wrapped = blocks + .into_iter() + .map(|block| BlockWrapper::Block { block }) + .collect(); + worker.process_chain_segment(process_id, wrapped).await + }), /* * Processing of Status Messages. */ @@ -1867,9 +1873,13 @@ impl BeaconProcessor { process_id, blocks_and_blobs, } => task_spawner.spawn_async(async move { - worker - .process_blob_chain_segment(process_id, blocks_and_blobs) - .await + let wrapped = blocks_and_blobs + .into_iter() + .map(|b| BlockWrapper::BlockAndBlob { + block_sidecar_pair: b, + }) + .collect(); + worker.process_chain_segment(process_id, wrapped).await }), }; } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 7e22d4d8f..114d9805b 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -15,7 +15,11 @@ use lighthouse_network::PeerAction; use slog::{debug, error, info, warn}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Epoch, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar}; +use types::signed_block_and_blobs::BlockWrapper; +use types::{ + Epoch, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, + SignedBeaconBlockAndBlobsSidecarDecode, +}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -126,7 +130,7 @@ impl Worker { pub async fn process_chain_segment( &self, sync_type: ChainSegmentProcessId, - downloaded_blocks: Vec>>, + downloaded_blocks: Vec>, ) { let result = match sync_type { // this a request from the range sync @@ -176,7 +180,18 @@ impl Worker { let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); - match self.process_backfill_blocks(downloaded_blocks) { + let unwrapped = downloaded_blocks + .into_iter() + .map(|block| match block { + BlockWrapper::Block { block } => block, + //FIXME(sean) handle blobs in backfill + BlockWrapper::BlockAndBlob { + block_sidecar_pair: _, + } => todo!(), + }) + .collect(); + + match self.process_backfill_blocks(unwrapped) { (_, Ok(_)) => { debug!(self.log, "Backfill batch processed"; "batch_epoch" => epoch, @@ -241,24 +256,13 @@ impl Worker { self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); } - pub async fn process_blob_chain_segment( - &self, - sync_type: ChainSegmentProcessId, - downloaded_blocks: Vec>, - ) { - warn!(self.log, "FAKE PROCESSING A BLOBS SEGMENT!!!"); - let result = BatchProcessResult::Success { - was_non_empty: !downloaded_blocks.is_empty(), - }; - self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); - } /// Helper function to process blocks batches which only consumes the chain and blocks to process. async fn process_blocks<'a>( &self, - downloaded_blocks: impl Iterator>>, + downloaded_blocks: impl Iterator>, count_unrealized: CountUnrealized, ) -> (usize, Result<(), ChainSegmentFailed>) { - let blocks: Vec> = downloaded_blocks.cloned().collect(); + let blocks: Vec<_> = downloaded_blocks.cloned().collect(); match self .chain .process_chain_segment(blocks, count_unrealized) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index ffd84a9f0..4ebbf1c1d 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -647,6 +647,7 @@ impl BlockLookups { WorkEvent::rpc_beacon_block(block_root, block, duration, process_type) } BlockTy::BlockAndBlob { block_sidecar_pair } => { + //FIXME(sean) // WorkEvent::rpc_block_and_glob(block_sidecar_pair) todo!("we also need to process block-glob pairs for rpc") } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 614bf34cf..368c4f518 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -108,7 +108,7 @@ impl SingleBlockRequest { pub fn verify_block( &mut self, block: Option>, - ) -> Result)>, VerifyError> { + ) -> Result>, VerifyError> { match self.state { State::AwaitingDownload => { self.register_failure_downloading(); diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index 6ee02b71b..de2d60fae 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -35,5 +35,5 @@ procinfo = { version = "0.4.2", optional = true } [features] default = ["lighthouse"] lighthouse = ["proto_array", "psutil", "procinfo", "store", "slashing_protection"] -withdrawals = ["store/withdrawals"] +withdrawals = ["store/withdrawals", "types/withdrawals"] withdrawals-processing = ["store/withdrawals-processing"] \ No newline at end of file diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index 5a6b8c917..be47e66c9 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -1,4 +1,4 @@ -use crate::{BlobsSidecar, EthSpec, SignedBeaconBlock, SignedBeaconBlockEip4844}; +use crate::{BlobsSidecar, EthSpec, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot}; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, DecodeError}; use ssz_derive::{Decode, Encode}; @@ -33,7 +33,7 @@ impl SignedBeaconBlockAndBlobsSidecar { } /// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum BlockWrapper { Block { block: Arc>, @@ -44,10 +44,34 @@ pub enum BlockWrapper { } impl BlockWrapper { + pub fn slot(&self) -> Slot { + match self { + BlockWrapper::Block { block } => block.slot(), + BlockWrapper::BlockAndBlob { block_sidecar_pair } => { + block_sidecar_pair.beacon_block.slot() + } + } + } pub fn block(&self) -> &SignedBeaconBlock { match self { BlockWrapper::Block { block } => &block, BlockWrapper::BlockAndBlob { block_sidecar_pair } => &block_sidecar_pair.beacon_block, } } + pub fn block_cloned(&self) -> Arc> { + match self { + BlockWrapper::Block { block } => block.clone(), + BlockWrapper::BlockAndBlob { block_sidecar_pair } => { + block_sidecar_pair.beacon_block.clone() + } + } + } + pub fn blocks_sidecar(&self) -> Option>> { + match self { + BlockWrapper::Block { block: _ } => None, + BlockWrapper::BlockAndBlob { block_sidecar_pair } => { + Some(block_sidecar_pair.blobs_sidecar.clone()) + } + } + } }