From 45897ad4e14ab37dbf671c42b9f856d96c420e93 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Sat, 19 Nov 2022 15:18:42 -0500 Subject: [PATCH] remove blob wrapper --- beacon_node/beacon_chain/src/beacon_chain.rs | 91 ++++++++++--------- .../beacon_chain/src/blob_verification.rs | 8 +- .../beacon_chain/src/block_verification.rs | 58 ++++++------ .../src/rpc/codec/ssz_snappy.rs | 33 ++++--- .../lighthouse_network/src/rpc/methods.rs | 14 ++- .../lighthouse_network/src/rpc/outbound.rs | 8 +- .../lighthouse_network/src/service/mod.rs | 7 +- .../lighthouse_network/src/types/pubsub.rs | 10 +- .../network/src/beacon_processor/mod.rs | 6 +- .../beacon_processor/worker/gossip_methods.rs | 16 ++-- beacon_node/network/src/router/processor.rs | 8 +- .../network/src/sync/block_lookups/mod.rs | 3 +- .../src/sync/block_lookups/parent_lookup.rs | 11 +-- .../sync/block_lookups/single_block_lookup.rs | 3 +- beacon_node/network/src/sync/manager.rs | 5 +- .../state_processing/src/consensus_context.rs | 31 ++++++- .../src/per_block_processing.rs | 3 + .../per_block_processing/eip4844/eip4844.rs | 1 + .../state_processing/src/upgrade/eip4844.rs | 2 +- consensus/types/src/lib.rs | 2 +- consensus/types/src/signed_block_and_blobs.rs | 38 ++------ 21 files changed, 199 insertions(+), 159 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1f882de71..7ce66d0c9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -102,12 +102,11 @@ use task_executor::{ShutdownReason, TaskExecutor}; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::*; -use types::signed_block_and_blobs::BlockMaybeBlobs; pub type ForkChoiceError = fork_choice::Error; /// Alias to appease clippy. -type HashBlockTuple = (Hash256, BlockMaybeBlobs); +type HashBlockTuple = (Hash256, Arc>); /// The time-out before failure during an operation to take a read/write RwLock on the block /// processing cache. @@ -924,7 +923,7 @@ impl BeaconChain { pub async fn get_block( &self, block_root: &Hash256, - ) -> Result>, Error> { + ) -> Result>>, Error> { // Load block from database, returning immediately if we have the full block w payload // stored. let blinded_block = match self.store.try_get_full_block(block_root)? { @@ -2198,7 +2197,7 @@ impl BeaconChain { /// This method is potentially long-running and should not run on the core executor. pub fn filter_chain_segment( self: &Arc, - chain_segment: Vec>, + chain_segment: Vec>>, ) -> Result>, ChainSegmentResult> { // This function will never import any blocks. let imported_blocks = 0; @@ -2304,7 +2303,7 @@ impl BeaconChain { /// `Self::process_block`. pub async fn process_chain_segment( self: &Arc, - chain_segment: Vec>, + chain_segment: Vec>>, count_unrealized: CountUnrealized, ) -> ChainSegmentResult { let mut imported_blocks = 0; @@ -2326,9 +2325,8 @@ impl BeaconChain { } }; - while let Some((_root, block_wrapper)) = filtered_chain_segment.first() { - - let block: &SignedBeaconBlock = block_wrapper.block(); + while let Some((_root, block)) = filtered_chain_segment.first() { + let block: &SignedBeaconBlock = block.block(); // Determine the epoch of the first block in the remaining segment. let start_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); @@ -2406,17 +2404,17 @@ impl BeaconChain { /// Returns an `Err` if the given block was invalid, or an error was encountered during pub async fn verify_block_for_gossip( self: &Arc, - block_wrapper: BlockMaybeBlobs, + block: Arc>, ) -> Result, BlockError> { let chain = self.clone(); self.task_executor .clone() .spawn_blocking_handle( move || { - let slot = block_wrapper.block().slot(); - let graffiti_string = block_wrapper.block().message().body().graffiti().as_utf8_lossy(); + let slot = block.block().slot(); + let graffiti_string = block.block().message().body().graffiti().as_utf8_lossy(); - match GossipVerifiedBlock::new(block_wrapper, &chain) { + match GossipVerifiedBlock::new(block, &chain) { Ok(verified) => { debug!( chain.log, @@ -2618,7 +2616,7 @@ impl BeaconChain { #[allow(clippy::too_many_arguments)] fn import_block( &self, - block_wrapper: BlockMaybeBlobs, + block: Arc>, block_root: Hash256, mut state: BeaconState, confirmed_state_roots: Vec, @@ -2627,7 +2625,7 @@ impl BeaconChain { parent_block: SignedBlindedBeaconBlock, parent_eth1_finalization_data: Eth1FinalizationData, ) -> Result> { - let signed_block = block_wrapper.block(); + let signed_block = block.block(); let current_slot = self.slot()?; let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); @@ -2960,6 +2958,10 @@ impl BeaconChain { .collect(); ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); + + if let Some(blobs) = block.blobs() { + ops.push(StoreOp::PutBlobs(block_root, blobs)); + }; let txn_lock = self.store.hot_db.begin_rw_transaction(); if let Err(e) = self.store.do_atomically(ops) { @@ -3366,7 +3368,7 @@ impl BeaconChain { // // Wait for the execution layer to return an execution payload (if one is required). let prepare_payload_handle = partial_beacon_block.prepare_payload_handle.take(); - let execution_payload = if let Some(prepare_payload_handle) = prepare_payload_handle { + let block_contents = if let Some(prepare_payload_handle) = prepare_payload_handle { prepare_payload_handle .await .map_err(BlockProductionError::TokioJoin)? @@ -3375,9 +3377,6 @@ impl BeaconChain { return Err(BlockProductionError::MissingExecutionPayload); }; - //FIXME(sean) waiting for the BN<>EE api for this to stabilize - let kzg_commitments = vec![]; - // Part 3/3 (blocking) // // Perform the final steps of combining all the parts and computing the state root. @@ -3387,8 +3386,7 @@ impl BeaconChain { move || { chain.complete_partial_beacon_block( partial_beacon_block, - execution_payload, - kzg_commitments, + block_contents, verification, ) }, @@ -3635,7 +3633,6 @@ impl BeaconChain { &self, partial_beacon_block: PartialBeaconBlock, block_contents: BlockProposalContents, - kzg_commitments: Vec, verification: ProduceBlockVerification, ) -> Result, BlockProductionError> { let PartialBeaconBlock { @@ -3739,30 +3736,34 @@ impl BeaconChain { .map_err(|_| BlockProductionError::InvalidPayloadFork)?, }, }), - BeaconState::Eip4844(_) => BeaconBlock::Eip4844(BeaconBlockEip4844 { - slot, - proposer_index, - parent_root, - state_root: Hash256::zero(), - body: BeaconBlockBodyEip4844 { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings: proposer_slashings.into(), - attester_slashings: attester_slashings.into(), - attestations: attestations.into(), - deposits: deposits.into(), - voluntary_exits: voluntary_exits.into(), - sync_aggregate: sync_aggregate - .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: block_contents - .to_payload() - .try_into() - .map_err(|_| BlockProductionError::InvalidPayloadFork)?, - //FIXME(sean) get blobs - blob_kzg_commitments: VariableList::from(kzg_commitments), - }, - }), + BeaconState::Eip4844(_) => { + let kzg_commitments = block_contents + .kzg_commitments() + .ok_or(BlockProductionError::InvalidPayloadFork)?; + BeaconBlock::Eip4844(BeaconBlockEip4844 { + slot, + proposer_index, + parent_root, + state_root: Hash256::zero(), + body: BeaconBlockBodyEip4844 { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings: proposer_slashings.into(), + attester_slashings: attester_slashings.into(), + attestations: attestations.into(), + deposits: deposits.into(), + voluntary_exits: voluntary_exits.into(), + sync_aggregate: sync_aggregate + .ok_or(BlockProductionError::MissingSyncAggregate)?, + execution_payload: block_contents + .to_payload() + .try_into() + .map_err(|_| BlockProductionError::InvalidPayloadFork)?, + blob_kzg_commitments: VariableList::from(kzg_commitments.to_vec()), + }, + }) + } }; let block = SignedBeaconBlock::from_block( diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 0ca1785d0..d43eb7ea7 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; use derivative::Derivative; use slot_clock::SlotClock; +use std::sync::Arc; use crate::beacon_chain::{BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use crate::BeaconChainError; @@ -80,7 +80,10 @@ impl From for BlobError { } } -pub fn validate_blob_for_gossip(blob_sidecar: &BlobsSidecar, chain: &Arc>) -> Result<(), BlobError>{ +pub fn validate_blob_for_gossip( + blob_sidecar: &BlobsSidecar, + chain: &Arc>, +) -> Result<(), BlobError> { let blob_slot = blob_sidecar.beacon_block_slot; // Do not gossip or process blobs from future or past slots. let latest_permissible_slot = chain @@ -121,4 +124,3 @@ pub fn validate_blob_for_gossip(blob_sidecar: &BlobsSidecar // TODO: `validate_blobs_sidecar` Ok(()) } - diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 15c4c0f52..d5aaaeb1b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -42,6 +42,7 @@ //! END //! //! ``` +use crate::blob_verification::validate_blob_for_gossip; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, @@ -83,14 +84,12 @@ use std::time::Duration; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; -use types::{BlobsSidecar, ExecPayload, SignedBeaconBlockAndBlobsSidecar}; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; -use types::signed_block_and_blobs::BlockMaybeBlobs; -use crate::blob_verification::validate_blob_for_gossip; +use types::{BlobsSidecar, ExecPayload, SignedBeaconBlockAndBlobsSidecar}; pub const POS_PANDA_BANNER: &str = r#" ,,, ,,, ,,, ,,, @@ -137,7 +136,7 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it cannot be processed without already knowing /// its parent. - ParentUnknown(BlockMaybeBlobs), + ParentUnknown(Arc>), /// The block skips too many slots and is a DoS risk. TooManySkippedSlots { parent_slot: Slot, block_slot: Slot }, /// The block slot is greater than the present slot. @@ -526,7 +525,7 @@ fn process_block_slash_info( /// The given `chain_segment` must contain only blocks from the same epoch, otherwise an error /// will be returned. pub fn signature_verify_chain_segment( - mut chain_segment: Vec<(Hash256, BlockMaybeBlobs)>, + mut chain_segment: Vec<(Hash256, Arc>)>, chain: &BeaconChain, ) -> Result>, BlockError> { if chain_segment.is_empty() { @@ -563,6 +562,8 @@ pub fn signature_verify_chain_segment( drop(pubkey_cache); + //FIXME(sean) batch verify kzg blobs + let mut signature_verified_blocks = chain_segment .into_iter() .map(|(block_root, block)| { @@ -591,7 +592,7 @@ pub fn signature_verify_chain_segment( #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct GossipVerifiedBlock { - pub block: BlockMaybeBlobs, + pub block: Arc>, pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, @@ -600,7 +601,7 @@ pub struct GossipVerifiedBlock { /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit /// signatures) have been verified. pub struct SignatureVerifiedBlock { - block: BlockMaybeBlobs, + block: Arc>, block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, @@ -617,12 +618,13 @@ type PayloadVerificationHandle = /// - Signatures /// - State root check /// - Per block processing +/// - Blobs sidecar has been validated if present /// /// Note: a `ExecutionPendingBlock` is not _forever_ valid to be imported, it may later become invalid /// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the /// `BeaconChain` immediately after it is instantiated. pub struct ExecutionPendingBlock { - pub block: BlockMaybeBlobs, + pub block: Arc>, pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock>, @@ -667,7 +669,7 @@ impl GossipVerifiedBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn new( - block: BlockMaybeBlobs, + block: Arc>, chain: &BeaconChain, ) -> Result> { // If the block is valid for gossip we don't supply it to the slasher here because @@ -682,10 +684,10 @@ impl GossipVerifiedBlock { /// As for new, but doesn't pass the block to the slasher. fn new_without_slasher_checks( - block_wrapper: BlockMaybeBlobs, + block: Arc>, chain: &BeaconChain, ) -> Result> { - let block = block_wrapper.block(); + let block = block.block(); // Ensure the block is the correct structure for the fork at `block.slot()`. block .fork_name(&chain.spec) @@ -879,9 +881,10 @@ impl GossipVerifiedBlock { // Validate the block's execution_payload (if any). validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?; - if let Some(blobs_sidecar) = block_wrapper.blobs() { - validate_blob_for_gossip(blobs_sidecar, chain)?; - } + //FIXME(sean) + // if let Some(blobs_sidecar) = block.blobs() { + // validate_blob_for_gossip(blobs_sidecar, chain)?; + // } // Having checked the proposer index and the block root we can cache them. let consensus_context = ConsensusContext::new(block.slot()) @@ -889,7 +892,7 @@ impl GossipVerifiedBlock { .set_proposer_index(block.message().proposer_index()); Ok(Self { - block: block_wrapper, + block: block, block_root, parent, consensus_context, @@ -924,7 +927,7 @@ impl SignatureVerifiedBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn new( - block: BlockMaybeBlobs, + block: Arc>, block_root: Hash256, chain: &BeaconChain, ) -> Result> { @@ -970,7 +973,7 @@ impl SignatureVerifiedBlock { /// As for `new` above but producing `BlockSlashInfo`. pub fn check_slashable( - block: BlockMaybeBlobs, + block: Arc>, block_root: Hash256, chain: &BeaconChain, ) -> Result>> { @@ -1064,7 +1067,7 @@ impl IntoExecutionPendingBlock for SignatureVerifiedBloc } } -impl IntoExecutionPendingBlock for BlockMaybeBlobs { +impl IntoExecutionPendingBlock for Arc> { /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` /// and then using that implementation of `IntoExecutionPendingBlock` to complete verification. fn into_execution_pending_block_slashable( @@ -1097,13 +1100,13 @@ impl ExecutionPendingBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn from_signature_verified_components( - block_wrapper: BlockMaybeBlobs, + block: Arc>, block_root: Hash256, parent: PreProcessingSnapshot, mut consensus_context: ConsensusContext, chain: &Arc>, ) -> Result> { - let block = block_wrapper.block(); + let block = block.block(); if let Some(parent) = chain .canonical_head @@ -1128,7 +1131,7 @@ impl ExecutionPendingBlock { // because it will revert finalization. Note that the finalized block is stored in fork // choice, so we will not reject any child of the finalized block (this is relevant during // genesis). - return Err(BlockError::ParentUnknown(block_wrapper)); + return Err(BlockError::ParentUnknown(block)); } // Reject any block that exceeds our limit on skipped slots. @@ -1545,7 +1548,7 @@ pub fn check_block_is_finalized_descendant( }) } else { //FIXME(sean) does this matter if it only returns a block? - Err(BlockError::ParentUnknown(BlockMaybeBlobs::Block(block.clone()))) + Err(BlockError::ParentUnknown(block.clone())) } } } @@ -1637,16 +1640,15 @@ fn verify_parent_block_is_known( #[allow(clippy::type_complexity)] fn load_parent( block_root: Hash256, - block_wrapper: BlockMaybeBlobs, + block: Arc>, chain: &BeaconChain, ) -> Result< ( PreProcessingSnapshot, - BlockMaybeBlobs, + Arc>, ), BlockError, > { - let block = block_wrapper.block(); let spec = &chain.spec; // Reject any block if its parent is not known to fork choice. @@ -1664,7 +1666,7 @@ fn load_parent( .fork_choice_read_lock() .contains_block(&block.parent_root()) { - return Err(BlockError::ParentUnknown(block_wrapper)); + return Err(BlockError::ParentUnknown(block)); } let block_delay = chain @@ -1703,7 +1705,7 @@ fn load_parent( "block_delay" => ?block_delay, ); } - Ok((snapshot, block_wrapper)) + Ok((snapshot, block)) } else { // Load the blocks parent block from the database, returning invalid if that block is not // found. @@ -1750,7 +1752,7 @@ fn load_parent( pre_state: parent_state, beacon_state_root: Some(parent_state_root), }, - block_wrapper, + beacon_block, )) }; diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index f6b99881b..ef06b2b71 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -15,7 +15,11 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; -use types::{BlobsSidecar, EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockEip4844, SignedBeaconBlockMerge}; +use types::{ + BlobsSidecar, EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, + SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, SignedBeaconBlockCapella, + SignedBeaconBlockEip4844, SignedBeaconBlockMerge, +}; use unsigned_varint::codec::Uvi; const CONTEXT_BYTES_LEN: usize = 4; @@ -311,8 +315,11 @@ impl Decoder for SSZSnappyOutboundCodec { let _read_bytes = src.split_to(n as usize); match self.protocol.version { - Version::V1 => handle_v1_response(self.protocol.message_name, &decoded_buffer, - &mut self.fork_name, ), + Version::V1 => handle_v1_response( + self.protocol.message_name, + &decoded_buffer, + &mut self.fork_name, + ), Version::V2 => handle_v2_response( self.protocol.message_name, &decoded_buffer, @@ -482,11 +489,9 @@ fn handle_v1_request( Protocol::BlobsByRange => Ok(Some(InboundRequest::BlobsByRange( BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), - Protocol::BlobsByRoot => Ok(Some(InboundRequest::BlobsByRoot( - BlobsByRootRequest{ - block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, - }, - ))), + Protocol::BlobsByRoot => Ok(Some(InboundRequest::BlobsByRoot(BlobsByRootRequest { + block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, + }))), Protocol::Ping => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -576,7 +581,7 @@ fn handle_v1_response( "Invalid forkname for blobsbyrange".to_string(), )), } - }, + } Protocol::BlobsByRoot => { let fork_name = fork_name.take().ok_or_else(|| { RPCError::ErrorResponse( @@ -593,7 +598,7 @@ fn handle_v1_response( "Invalid forkname for blobsbyroot".to_string(), )), } - }, + } Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -678,8 +683,12 @@ fn handle_v2_response( )?), )))), }, - Protocol::BlobsByRange => Err(RPCError::InvalidData("blobs by range via v2".to_string())), - Protocol::BlobsByRoot => Err(RPCError::InvalidData("blobs by range via v2".to_string())), + Protocol::BlobsByRange => { + Err(RPCError::InvalidData("blobs by range via v2".to_string())) + } + Protocol::BlobsByRoot => { + Err(RPCError::InvalidData("blobs by range via v2".to_string())) + } _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, "Invalid v2 request".to_string(), diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 65e0293b8..2a2d12f3f 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -13,8 +13,8 @@ use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; use types::blobs_sidecar::BlobsSidecar; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; use types::SignedBeaconBlockAndBlobsSidecar; +use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// Maximum number of blocks in a single request. pub type MaxRequestBlocks = U1024; @@ -427,10 +427,18 @@ impl std::fmt::Display for RPCResponse { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } RPCResponse::BlobsByRange(blob) => { - write!(f, "BlobsByRange: Blob slot: {}", blob.blobs_sidecar.beacon_block_slot) + write!( + f, + "BlobsByRange: Blob slot: {}", + blob.blobs_sidecar.beacon_block_slot + ) } RPCResponse::BlobsByRoot(blob) => { - write!(f, "BlobsByRoot: Blob slot: {}", blob.blobs_sidecar.beacon_block_slot) + write!( + f, + "BlobsByRoot: Blob slot: {}", + blob.blobs_sidecar.beacon_block_slot + ) } RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()), diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 0de0eddea..8d301fdc1 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -82,9 +82,11 @@ impl OutboundRequest { Version::V1, Encoding::SSZSnappy, )], - OutboundRequest::BlobsByRoot(_) => vec![ - ProtocolId::new(Protocol::BlobsByRoot, Version::V1, Encoding::SSZSnappy), - ], + OutboundRequest::BlobsByRoot(_) => vec![ProtocolId::new( + Protocol::BlobsByRoot, + Version::V1, + Encoding::SSZSnappy, + )], OutboundRequest::Ping(_) => vec![ProtocolId::new( Protocol::Ping, Version::V1, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index b06134a8f..4c486e050 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1264,11 +1264,8 @@ impl Network { Some(event) } InboundRequest::BlobsByRoot(req) => { - let event = self.build_request( - peer_request_id, - peer_id, - Request::BlobsByRoot(req), - ); + let event = + self.build_request(peer_request_id, peer_id, Request::BlobsByRoot(req)); Some(event) } } diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 9663c06b6..fdc4696e9 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -11,7 +11,13 @@ use std::boxed::Box; use std::io::{Error, ErrorKind}; use std::sync::Arc; use tree_hash_derive::TreeHash; -use types::{Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ForkContext, ForkName, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockEip4844, SignedBeaconBlockMerge, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId}; +use types::{ + Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ForkContext, ForkName, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAltair, + SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, SignedBeaconBlockCapella, + SignedBeaconBlockEip4844, SignedBeaconBlockMerge, SignedContributionAndProof, + SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, +}; #[derive(Debug, Clone, PartialEq)] pub enum PubsubMessage { @@ -281,7 +287,7 @@ impl std::fmt::Display for PubsubMessage { PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blob) => write!( f, "Beacon block and Blobs Sidecar: slot: {}, blobs: {}", - block_and_blob.beacon_block.message().slot(), + block_and_blob.beacon_block.message.slot, block_and_blob.blobs_sidecar.blobs.len(), ), PubsubMessage::AggregateAndProofAttestation(att) => write!( diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index cf3bba65b..1dd4629dd 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -61,7 +61,11 @@ use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; use tokio::sync::mpsc; -use types::{Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId}; +use types::{ + Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, + SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedContributionAndProof, + SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, +}; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index d2391c2cf..b55246071 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -10,9 +10,7 @@ use beacon_chain::{ BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError, GossipVerifiedBlock, }; -use lighthouse_network::{ - Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource, -}; +use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; @@ -20,8 +18,12 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; -use types::{Attestation, AttesterSlashing, BlobsSidecar, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId}; -use types::signed_block_and_blobs::BlockMaybeBlobs; +use types::{ + Attestation, AttesterSlashing, BlobsSidecar, EthSpec, Hash256, IndexedAttestation, + ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, + SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, + SyncSubnetId, +}; use super::{ super::work_reprocessing_queue::{ @@ -655,7 +657,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, peer_client: Client, - block: BlockMaybeBlobs, + block: Arc>, reprocess_tx: mpsc::Sender>, duplicate_cache: DuplicateCache, seen_duration: Duration, @@ -702,7 +704,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, peer_client: Client, - block: BlockMaybeBlobs, + block: Arc>, reprocess_tx: mpsc::Sender>, seen_duration: Duration, ) -> Option> { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index faa451808..432b11b88 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -7,7 +7,7 @@ use crate::sync::manager::RequestId as SyncId; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; -use lighthouse_network::{rpc::*}; +use lighthouse_network::rpc::*; use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response, }; @@ -17,7 +17,11 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::SyncCommitteeMessage; use tokio::sync::mpsc; -use types::{Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId}; +use types::{ + Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, + SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, +}; /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 1b70cf7aa..a5a5eb2d1 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -9,7 +9,6 @@ use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; -use types::signed_block_and_blobs::BlockMaybeBlobs; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; @@ -31,7 +30,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type RootBlockTuple = (Hash256, BlockMaybeBlobs); +pub type RootBlockTuple = (Hash256, Arc>); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index cbd8ee243..38ad59ebc 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -4,7 +4,6 @@ use lighthouse_network::PeerId; use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; use strum::IntoStaticStr; -use types::signed_block_and_blobs::BlockMaybeBlobs; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, @@ -25,7 +24,7 @@ pub(crate) struct ParentLookup { /// The root of the block triggering this parent request. chain_hash: Hash256, /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>, + downloaded_blocks: Vec>>, /// Request of the last parent. current_parent_request: SingleBlockRequest, /// Id of the last parent request. @@ -62,7 +61,7 @@ impl ParentLookup { pub fn new( block_root: Hash256, - block: BlockMaybeBlobs, + block: Arc>, peer_id: PeerId, ) -> Self { let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); @@ -99,7 +98,7 @@ impl ParentLookup { self.current_parent_request.check_peer_disconnected(peer_id) } - pub fn add_block(&mut self, block: BlockMaybeBlobs) { + pub fn add_block(&mut self, block: Arc>) { let next_parent = block.parent_root(); self.downloaded_blocks.push(block); self.current_parent_request.hash = next_parent; @@ -126,7 +125,7 @@ impl ParentLookup { self.current_parent_request_id = None; } - pub fn chain_blocks(&mut self) -> Vec> { + pub fn chain_blocks(&mut self) -> Vec>> { std::mem::take(&mut self.downloaded_blocks) } @@ -134,7 +133,7 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_block( &mut self, - block: Option>, + block: Option>>, failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result>, VerifyError> { let root_and_block = self.current_parent_request.verify_block(block)?; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 6459fe05e..256a2b429 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -8,7 +8,6 @@ use rand::seq::IteratorRandom; use ssz_types::VariableList; use store::{EthSpec, Hash256, SignedBeaconBlock}; use strum::IntoStaticStr; -use types::signed_block_and_blobs::BlockMaybeBlobs; /// Object representing a single block lookup request. #[derive(PartialEq, Eq)] @@ -106,7 +105,7 @@ impl SingleBlockRequest { /// Returns the block for processing if the response is what we expected. pub fn verify_block( &mut self, - block: Option>, + block: Option>>, ) -> Result>, VerifyError> { match self.state { State::AwaitingDownload => { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 4f2e60977..230a67fcf 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -53,7 +53,6 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::signed_block_and_blobs::BlockMaybeBlobs; use types::{ BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot, }; @@ -104,12 +103,12 @@ pub enum SyncMessage { RpcBlock { request_id: RequestId, peer_id: PeerId, - beacon_block: Option>, + beacon_block: Option>>, seen_timestamp: Duration, }, /// A block with an unknown parent has been received. - UnknownBlock(PeerId, BlockMaybeBlobs, Hash256), + UnknownBlock(PeerId, Arc>, Hash256), /// A peer has sent an object that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 18ae5ad3b..4ae1e7e23 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -1,8 +1,8 @@ use std::marker::PhantomData; use tree_hash::TreeHash; use types::{ - AbstractExecPayload, BeaconState, BeaconStateError, ChainSpec, EthSpec, ExecPayload, Hash256, - SignedBeaconBlock, Slot, + AbstractExecPayload, BeaconState, BeaconStateError, BlobsSidecar, ChainSpec, EthSpec, + ExecPayload, Hash256, SignedBeaconBlock, Slot, }; #[derive(Debug)] @@ -13,6 +13,12 @@ pub struct ConsensusContext { proposer_index: Option, /// Block root of the block at `slot`. current_block_root: Option, + /// Should only be populated if the sidecar has not been validated. + blobs_sidecar: Option>>, + /// Whether `validate_blobs_sidecar` has successfully passed. + blobs_sidecar_validated: bool, + /// Whether `verify_kzg_commitments_against_transactions` has successfully passed. + blobs_verified_vs_txs: bool, _phantom: PhantomData, } @@ -34,6 +40,9 @@ impl ConsensusContext { slot, proposer_index: None, current_block_root: None, + blobs_sidecar: None, + blobs_sidecar_validated: false, + blobs_verified_vs_txs: false, _phantom: PhantomData, } } @@ -89,4 +98,22 @@ impl ConsensusContext { }) } } + + pub fn set_blobs_sidecar_validated(mut self, blobs_sidecar_validated: bool) -> Self { + self.blobs_sidecar_validated = blobs_sidecar_validated; + self + } + + pub fn set_blobs_verified_vs_txs(mut self, blobs_verified_vs_txs: bool) -> Self { + self.blobs_verified_vs_txs = blobs_verified_vs_txs; + self + } + + pub fn blobs_sidecar_validated(&self) -> bool { + self.blobs_sidecar_validated + } + + pub fn blobs_verified_vs_txs(&self) -> bool { + self.blobs_verified_vs_txs + } } diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index 255a4892a..7a093d558 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -180,6 +180,9 @@ pub fn per_block_processing>( process_blob_kzg_commitments(block.body())?; + //FIXME(sean) add `validate_blobs_sidecar` (is_data_available) and only run it if the consensus + // context tells us it wasnt already run + Ok(()) } diff --git a/consensus/state_processing/src/per_block_processing/eip4844/eip4844.rs b/consensus/state_processing/src/per_block_processing/eip4844/eip4844.rs index 56b3ed58a..284d0d0d6 100644 --- a/consensus/state_processing/src/per_block_processing/eip4844/eip4844.rs +++ b/consensus/state_processing/src/per_block_processing/eip4844/eip4844.rs @@ -18,6 +18,7 @@ pub fn process_blob_kzg_commitments> block_body.blob_kzg_commitments(), ) { if let Some(transactions) = payload.transactions() { + //FIXME(sean) only run if this wasn't run in gossip (use consensus context) if !verify_kzg_commitments_against_transactions::(transactions, kzg_commitments)? { return Err(BlockProcessingError::BlobVersionHashMismatch); } diff --git a/consensus/state_processing/src/upgrade/eip4844.rs b/consensus/state_processing/src/upgrade/eip4844.rs index ce88364f0..78fb16033 100644 --- a/consensus/state_processing/src/upgrade/eip4844.rs +++ b/consensus/state_processing/src/upgrade/eip4844.rs @@ -11,7 +11,7 @@ pub fn upgrade_to_eip4844( // FIXME(sean) This is a hack to let us participate in testnets where capella doesn't exist. // if we are disabling withdrawals, assume we should fork off of bellatrix. - let previous_fork_version = if cfg!(feature ="withdrawals") { + let previous_fork_version = if cfg!(feature = "withdrawals") { pre.fork.current_version } else { spec.bellatrix_fork_version diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 55da9cab0..e970b17c9 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -150,7 +150,6 @@ pub use crate::historical_batch::HistoricalBatch; pub use crate::indexed_attestation::IndexedAttestation; pub use crate::kzg_commitment::KzgCommitment; pub use crate::kzg_proof::KzgProof; -pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecar; pub use crate::participation_flags::ParticipationFlags; pub use crate::participation_list::ParticipationList; pub use crate::payload::{ @@ -172,6 +171,7 @@ pub use crate::signed_beacon_block::{ SignedBlindedBeaconBlock, }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; +pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecar; pub use crate::signed_contribution_and_proof::SignedContributionAndProof; pub use crate::signed_voluntary_exit::SignedVoluntaryExit; pub use crate::signing_data::{SignedRoot, SigningData}; diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index 1dd1ca1cf..5e58a0890 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -1,37 +1,13 @@ -use std::sync::Arc; -use serde_derive::{Deserialize, Serialize}; -use ssz::{Decode, DecodeError}; -use ssz_derive::{Decode, Encode}; -use tree_hash_derive::TreeHash; use crate::{BlobsSidecar, EthSpec, SignedBeaconBlock, SignedBeaconBlockEip4844}; +use serde_derive::{Deserialize, Serialize}; +use ssz::{Decode, DecodeError, Encode}; +use ssz_derive::{Decode, Encode}; +use std::sync::Arc; +use tree_hash_derive::TreeHash; -#[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq)] #[serde(bound = "T: EthSpec")] pub struct SignedBeaconBlockAndBlobsSidecar { - pub beacon_block: SignedBeaconBlock, + pub beacon_block: SignedBeaconBlockEip4844, pub blobs_sidecar: BlobsSidecar, } - -impl Decode for SignedBeaconBlockAndBlobsSidecar { - fn is_ssz_fixed_len() -> bool { - todo!() - } - - fn from_ssz_bytes(bytes: &[u8]) -> Result { - todo!() - } -} - -pub enum BlockMaybeBlobs { - Block(Arc>), - BlockAndBlobs(Arc>), -} - -impl BlockMaybeBlobs { - pub fn blobs(&self) -> Option<&BlobsSidecar>{ - match self { - Self::Block(_) => None, - Self::BlockAndBlobs(block_and_blobs) => Some(&block_and_blobs.blobs_sidecar) - } - } -} \ No newline at end of file