From 5e11edc61202c6a3e5c7691904917a630516bd49 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 23 Dec 2022 12:47:38 -0500 Subject: [PATCH 1/8] fix blob validation for empty blobs --- .../beacon_chain/src/blob_verification.rs | 7 ++ .../beacon_chain/src/block_verification.rs | 23 ++++--- beacon_node/http_api/src/publish_blocks.rs | 6 +- .../network/src/beacon_processor/mod.rs | 4 +- .../beacon_processor/worker/sync_methods.rs | 6 +- .../src/sync/block_sidecar_coupling.rs | 10 ++- beacon_node/network/src/sync/manager.rs | 24 ++++--- .../network/src/sync/range_sync/batch.rs | 8 +-- .../state_processing/src/consensus_context.rs | 3 - consensus/types/src/blobs_sidecar.rs | 9 +++ consensus/types/src/signed_beacon_block.rs | 27 ++++++++ consensus/types/src/signed_block_and_blobs.rs | 65 ++++++++----------- crypto/kzg/src/kzg_proof.rs | 8 +++ 13 files changed, 119 insertions(+), 81 deletions(-) diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 1b05c7d39..3e5f5e740 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -5,6 +5,7 @@ use crate::{kzg_utils, BeaconChainError}; use bls::PublicKey; use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; use types::consts::eip4844::BLS_MODULUS; +use types::signed_beacon_block::BlobReconstructionError; use types::{BeaconStateError, BlobsSidecar, Hash256, KzgCommitment, Slot, Transactions}; #[derive(Debug)] @@ -91,6 +92,12 @@ pub enum BlobError { MissingBlobs, } +impl From for BlobError { + fn from(_: BlobReconstructionError) -> Self { + BlobError::MissingBlobs + } +} + impl From for BlobError { fn from(e: BeaconChainError) -> Self { BlobError::BeaconChainError(e) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index b9e65bc0a..6f330b8ea 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -87,6 +87,7 @@ use std::time::Duration; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; +use types::signed_beacon_block::BlobReconstructionError; use types::signed_block_and_blobs::BlockWrapper; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, @@ -479,6 +480,12 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: BlobReconstructionError) -> Self { + BlockError::BlobValidation(BlobError::from(e)) + } +} + /// Stores information about verifying a payload against an execution engine. pub struct PayloadVerificationOutcome { pub payload_verification_status: PayloadVerificationStatus, @@ -905,7 +912,7 @@ impl GossipVerifiedBlock { // Validate the block's execution_payload (if any). validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?; - if let Some(blobs_sidecar) = block.blobs() { + if let Some(blobs_sidecar) = block.blobs(Some(block_root))? { let kzg_commitments = block .message() .body() @@ -919,7 +926,7 @@ impl GossipVerifiedBlock { .map_err(|_| BlockError::BlobValidation(BlobError::TransactionsMissing))? .ok_or(BlockError::BlobValidation(BlobError::TransactionsMissing))?; validate_blob_for_gossip( - blobs_sidecar, + &blobs_sidecar, kzg_commitments, transactions, block.slot(), @@ -1134,12 +1141,8 @@ impl IntoExecutionPendingBlock for Arc &SignedBeaconBlock { @@ -1563,7 +1566,7 @@ impl ExecutionPendingBlock { if let Some(data_availability_boundary) = chain.data_availability_boundary() { if block_slot.epoch(T::EthSpec::slots_per_epoch()) >= data_availability_boundary { let sidecar = block - .blobs() + .blobs(Some(block_root))? .ok_or(BlockError::BlobValidation(BlobError::MissingBlobs))?; let kzg = chain.kzg.as_ref().ok_or(BlockError::BlobValidation( BlobError::TrustedSetupNotInitialized, @@ -1586,7 +1589,7 @@ impl ExecutionPendingBlock { block.slot(), block_root, kzg_commitments, - sidecar, + &sidecar, ) .map_err(|e| BlockError::BlobValidation(BlobError::KzgError(e)))? { diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 085f5036f..f024e49e2 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -43,9 +43,7 @@ pub async fn publish_block( network_tx, PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()), )?; - BlockWrapper::BlockAndBlob { - block_sidecar_pair: block_and_blobs, - } + BlockWrapper::BlockAndBlob(block_and_blobs) } else { //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required return Err(warp_utils::reject::broadcast_without_import(format!( @@ -54,7 +52,7 @@ pub async fn publish_block( } } else { crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; - BlockWrapper::Block { block } + BlockWrapper::Block(block) }; // Determine the delay after the start of the slot, register it with metrics. diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 887ecc497..9de006c84 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1699,7 +1699,7 @@ impl BeaconProcessor { message_id, peer_id, peer_client, - BlockWrapper::Block { block }, + BlockWrapper::Block(block), work_reprocessing_tx, duplicate_cache, seen_timestamp, @@ -1721,7 +1721,7 @@ impl BeaconProcessor { message_id, peer_id, peer_client, - BlockWrapper::BlockAndBlob { block_sidecar_pair }, + BlockWrapper::BlockAndBlob(block_sidecar_pair), work_reprocessing_tx, duplicate_cache, seen_timestamp, diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index dafc00bdd..b9ad82cf8 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -194,11 +194,9 @@ impl Worker { let unwrapped = downloaded_blocks .into_iter() .map(|block| match block { - BlockWrapper::Block { block } => block, + BlockWrapper::Block(block) => block, //FIXME(sean) handle blobs in backfill - BlockWrapper::BlockAndBlob { - block_sidecar_pair: _, - } => todo!(), + BlockWrapper::BlockAndBlob(_) => todo!(), }) .collect(); diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 95acadffb..f82417db3 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -51,16 +51,14 @@ impl BlockBlobRequestInfo { { let blobs_sidecar = accumulated_sidecars.pop_front().ok_or("missing sidecar")?; - Ok(BlockWrapper::BlockAndBlob { - block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar { + Ok(BlockWrapper::BlockAndBlob( + SignedBeaconBlockAndBlobsSidecar { beacon_block, blobs_sidecar, }, - }) + )) } else { - Ok(BlockWrapper::Block { - block: beacon_block, - }) + Ok(BlockWrapper::Block(beacon_block)) } }) .collect::, _>>(); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 305f9f706..109ed8194 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -734,14 +734,14 @@ impl SyncManager { RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( id, peer_id, - beacon_block.map(|block| BlockWrapper::Block { block }), + beacon_block.map(|block| BlockWrapper::Block(block)), seen_timestamp, &mut self.network, ), RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( id, peer_id, - beacon_block.map(|block| BlockWrapper::Block { block }), + beacon_block.map(|block| BlockWrapper::Block(block)), seen_timestamp, &mut self.network, ), @@ -756,7 +756,7 @@ impl SyncManager { batch_id, &peer_id, id, - beacon_block.map(|block| BlockWrapper::Block { block }), + beacon_block.map(|block| BlockWrapper::Block(block)), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -780,7 +780,7 @@ impl SyncManager { chain_id, batch_id, id, - beacon_block.map(|block| BlockWrapper::Block { block }), + beacon_block.map(|block| BlockWrapper::Block(block)), ); self.update_sync_state(); } @@ -930,9 +930,11 @@ impl SyncManager { RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( id, peer_id, - block_sidecar_pair.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { - // TODO: why is this in an arc - block_sidecar_pair: (*block_sidecar_pair).clone(), + block_sidecar_pair.map(|block_sidecar_pair| { + BlockWrapper::BlockAndBlob( + // TODO: why is this in an arc + (*block_sidecar_pair).clone(), + ) }), seen_timestamp, &mut self.network, @@ -940,9 +942,11 @@ impl SyncManager { RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( id, peer_id, - block_sidecar_pair.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { - // TODO: why is this in an arc - block_sidecar_pair: (*block_sidecar_pair).clone(), + block_sidecar_pair.map(|block_sidecar_pair| { + BlockWrapper::BlockAndBlob( + // TODO: why is this in an arc + (*block_sidecar_pair).clone(), + ) }), seen_timestamp, &mut self.network, diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index b0d266e07..f7e594598 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -25,11 +25,11 @@ impl BatchTy { match self { BatchTy::Blocks(blocks) => blocks .into_iter() - .map(|block| BlockWrapper::Block { block }) + .map(|block| BlockWrapper::Block(block)) .collect(), BatchTy::BlocksAndBlobs(block_sidecar_pair) => block_sidecar_pair .into_iter() - .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair }) + .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob(block_sidecar_pair)) .collect(), } } @@ -413,7 +413,7 @@ impl BatchInfo { match self.batch_type { ExpectedBatchTy::OnlyBlockBlobs => { let blocks = blocks.into_iter().map(|block| { - let BlockWrapper::BlockAndBlob { block_sidecar_pair: block_and_blob } = block else { + let BlockWrapper::BlockAndBlob(block_and_blob) = block else { panic!("Batches should never have a mixed type. This is a bug. Contact D") }; block_and_blob @@ -422,7 +422,7 @@ impl BatchInfo { } ExpectedBatchTy::OnlyBlock => { let blocks = blocks.into_iter().map(|block| { - let BlockWrapper::Block { block } = block else { + let BlockWrapper::Block(block) = block else { panic!("Batches should never have a mixed type. This is a bug. Contact D") }; block diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index f5585426c..3a5aebc47 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -21,8 +21,6 @@ pub struct ConsensusContext { /// Cache of indexed attestations constructed during block processing. indexed_attestations: HashMap<(AttestationData, BitList), IndexedAttestation>, - /// Should only be populated if the sidecar has not been validated. - blobs_sidecar: Option>>, /// Whether `validate_blobs_sidecar` has successfully passed. blobs_sidecar_validated: bool, /// Whether `verify_kzg_commitments_against_transactions` has successfully passed. @@ -49,7 +47,6 @@ impl ConsensusContext { proposer_index: None, current_block_root: None, indexed_attestations: HashMap::new(), - blobs_sidecar: None, blobs_sidecar_validated: false, blobs_verified_vs_txs: false, } diff --git a/consensus/types/src/blobs_sidecar.rs b/consensus/types/src/blobs_sidecar.rs index f1e2a4bb1..d522227a6 100644 --- a/consensus/types/src/blobs_sidecar.rs +++ b/consensus/types/src/blobs_sidecar.rs @@ -28,6 +28,15 @@ impl BlobsSidecar { Self::default() } + pub fn empty_from_parts(beacon_block_root: Hash256, beacon_block_slot: Slot) -> Self { + Self { + beacon_block_root, + beacon_block_slot, + blobs: VariableList::empty(), + kzg_aggregated_proof: KzgProof::empty(), + } + } + #[allow(clippy::integer_arithmetic)] pub fn max_size() -> usize { // Fixed part diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index 14f9358f6..89ccb95a1 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -36,6 +36,10 @@ impl From for Hash256 { } } +pub enum BlobReconstructionError { + BlobsMissing, +} + /// A `BeaconBlock` and a signature from its proposer. #[superstruct( variants(Base, Altair, Merge, Capella, Eip4844), @@ -235,6 +239,29 @@ impl> SignedBeaconBlock pub fn canonical_root(&self) -> Hash256 { self.message().tree_hash_root() } + + /// Reconstructs an empty `BlobsSidecar`, using the given block root if provided, else calculates it. + /// If this block has kzg commitments, an error will be returned. If this block is from prior to the + /// Eip4844 fork, `None` will be returned. + pub fn reconstruct_empty_blobs( + &self, + block_root_opt: Option, + ) -> Result>, BlobReconstructionError> { + self.message() + .body() + .blob_kzg_commitments() + .map(|kzg_commitments| { + if kzg_commitments.len() > 0 { + Err(BlobReconstructionError::BlobsMissing) + } else { + Ok(Some(BlobsSidecar::empty_from_parts( + block_root_opt.unwrap_or(self.canonical_root()), + self.slot(), + ))) + } + }) + .unwrap_or(Ok(None)) + } } // We can convert pre-Bellatrix blocks without payloads into blocks with payloads. diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index 09ff89e7b..4fcd09de4 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -1,3 +1,4 @@ +use crate::signed_beacon_block::BlobReconstructionError; use crate::{BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot}; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, DecodeError}; @@ -35,60 +36,52 @@ impl SignedBeaconBlockAndBlobsSidecar { /// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. #[derive(Clone, Debug)] pub enum BlockWrapper { - Block { - block: Arc>, - }, - BlockAndBlob { - block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar, - }, + Block(Arc>), + BlockAndBlob(SignedBeaconBlockAndBlobsSidecar), } impl BlockWrapper { pub fn slot(&self) -> Slot { match self { - BlockWrapper::Block { block } => block.slot(), - BlockWrapper::BlockAndBlob { block_sidecar_pair } => { + BlockWrapper::Block(block) => block.slot(), + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.slot() } } } pub fn block(&self) -> &SignedBeaconBlock { match self { - BlockWrapper::Block { block } => &block, - BlockWrapper::BlockAndBlob { block_sidecar_pair } => &block_sidecar_pair.beacon_block, + BlockWrapper::Block(block) => &block, + BlockWrapper::BlockAndBlob(block_sidecar_pair) => &block_sidecar_pair.beacon_block, } } pub fn block_cloned(&self) -> Arc> { match self { - BlockWrapper::Block { block } => block.clone(), - BlockWrapper::BlockAndBlob { block_sidecar_pair } => { + BlockWrapper::Block(block) => block.clone(), + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.clone() } } } - pub fn blobs(&self) -> Option<&BlobsSidecar> { + pub fn blobs( + &self, + block_root: Option, + ) -> Result>>, BlobReconstructionError> { match self { - BlockWrapper::Block { .. } => None, - BlockWrapper::BlockAndBlob { block_sidecar_pair } => { - Some(&block_sidecar_pair.blobs_sidecar) - } - } - } - - pub fn blobs_cloned(&self) -> Option>> { - match self { - BlockWrapper::Block { block: _ } => None, - BlockWrapper::BlockAndBlob { block_sidecar_pair } => { - Some(block_sidecar_pair.blobs_sidecar.clone()) + BlockWrapper::Block(block) => block + .reconstruct_empty_blobs(block_root) + .map(|blob_opt| blob_opt.map(Arc::new)), + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { + Ok(Some(block_sidecar_pair.blobs_sidecar.clone())) } } } pub fn message(&self) -> crate::BeaconBlockRef { match self { - BlockWrapper::Block { block } => block.message(), - BlockWrapper::BlockAndBlob { block_sidecar_pair } => { + BlockWrapper::Block(block) => block.message(), + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.message() } } @@ -100,8 +93,8 @@ impl BlockWrapper { pub fn deconstruct(self) -> (Arc>, Option>>) { match self { - BlockWrapper::Block { block } => (block, None), - BlockWrapper::BlockAndBlob { block_sidecar_pair } => { + BlockWrapper::Block(block) => (block, None), + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { let SignedBeaconBlockAndBlobsSidecar { beacon_block, blobs_sidecar, @@ -112,29 +105,25 @@ impl BlockWrapper { } } -// TODO: probably needes to be changed. This is needed because SignedBeaconBlockAndBlobsSidecar +// FIXME(sean): probably needs to be changed. This is needed because SignedBeaconBlockAndBlobsSidecar // does not implement Hash impl std::hash::Hash for BlockWrapper { fn hash(&self, state: &mut H) { match self { - BlockWrapper::Block { block } => block.hash(state), - BlockWrapper::BlockAndBlob { - block_sidecar_pair: block_and_blob, - } => block_and_blob.beacon_block.hash(state), + BlockWrapper::Block(block) => block.hash(state), + BlockWrapper::BlockAndBlob(block_and_blob) => block_and_blob.beacon_block.hash(state), } } } impl From> for BlockWrapper { fn from(block: SignedBeaconBlock) -> Self { - BlockWrapper::Block { - block: Arc::new(block), - } + BlockWrapper::Block(Arc::new(block)) } } impl From>> for BlockWrapper { fn from(block: Arc>) -> Self { - BlockWrapper::Block { block } + BlockWrapper::Block(block) } } diff --git a/crypto/kzg/src/kzg_proof.rs b/crypto/kzg/src/kzg_proof.rs index cb6e14df4..be85088f7 100644 --- a/crypto/kzg/src/kzg_proof.rs +++ b/crypto/kzg/src/kzg_proof.rs @@ -12,6 +12,14 @@ const KZG_PROOF_BYTES_LEN: usize = 48; #[ssz(struct_behaviour = "transparent")] pub struct KzgProof(pub [u8; KZG_PROOF_BYTES_LEN]); +impl KzgProof { + pub fn empty() -> Self { + let mut bytes = [0; KZG_PROOF_BYTES_LEN]; + bytes[0] = 192; + Self(bytes) + } +} + impl fmt::Display for KzgProof { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", eth2_serde_utils::hex::encode(self.0)) From d09523802b9dd3c1ef805a9943db2244a69b9c27 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 23 Dec 2022 12:52:26 -0500 Subject: [PATCH 2/8] impl hash correctly for the blob wrapper --- consensus/types/src/blobs_sidecar.rs | 4 +++- consensus/types/src/signed_block_and_blobs.rs | 19 +++++-------------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/consensus/types/src/blobs_sidecar.rs b/consensus/types/src/blobs_sidecar.rs index d522227a6..de14f7cb5 100644 --- a/consensus/types/src/blobs_sidecar.rs +++ b/consensus/types/src/blobs_sidecar.rs @@ -1,3 +1,4 @@ +use derivative::Derivative; use crate::test_utils::TestRandom; use crate::{Blob, EthSpec, Hash256, SignedBeaconBlock, SignedRoot, Slot}; use kzg::KzgProof; @@ -10,9 +11,10 @@ use tree_hash_derive::TreeHash; #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] #[derive( - Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default, TestRandom, + Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, Default, TestRandom, Derivative )] #[serde(bound = "T: EthSpec")] +#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] pub struct BlobsSidecar { pub beacon_block_root: Hash256, pub beacon_block_slot: Slot, diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index 4fcd09de4..721fe59ec 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -5,6 +5,7 @@ use ssz::{Decode, DecodeError}; use ssz_derive::{Decode, Encode}; use std::sync::Arc; use tree_hash_derive::TreeHash; +use derivative::Derivative; #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq)] #[serde(bound = "T: EthSpec")] @@ -13,8 +14,8 @@ pub struct SignedBeaconBlockAndBlobsSidecarDecode { pub blobs_sidecar: BlobsSidecar, } -#[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, PartialEq)] -#[serde(bound = "T: EthSpec")] +#[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, Derivative)] +#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] pub struct SignedBeaconBlockAndBlobsSidecar { pub beacon_block: Arc>, pub blobs_sidecar: Arc>, @@ -34,7 +35,8 @@ impl SignedBeaconBlockAndBlobsSidecar { } /// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Derivative)] +#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] pub enum BlockWrapper { Block(Arc>), BlockAndBlob(SignedBeaconBlockAndBlobsSidecar), @@ -105,17 +107,6 @@ impl BlockWrapper { } } -// FIXME(sean): probably needs to be changed. This is needed because SignedBeaconBlockAndBlobsSidecar -// does not implement Hash -impl std::hash::Hash for BlockWrapper { - fn hash(&self, state: &mut H) { - match self { - BlockWrapper::Block(block) => block.hash(state), - BlockWrapper::BlockAndBlob(block_and_blob) => block_and_blob.beacon_block.hash(state), - } - } -} - impl From> for BlockWrapper { fn from(block: SignedBeaconBlock) -> Self { BlockWrapper::Block(Arc::new(block)) From 1dc0759f57be77bda73f7cbd2b384648cb4d8967 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 23 Dec 2022 12:53:59 -0500 Subject: [PATCH 3/8] impl hash correctly for the blob wrapper --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/beacon_chain/src/early_attester_cache.rs | 2 +- consensus/types/src/signed_block_and_blobs.rs | 8 +++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ab0081715..a8ed70561 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2944,7 +2944,7 @@ impl BeaconChain { ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); - if let Some(blobs) = blobs { + if let Some(blobs) = blobs? { //FIXME(sean) using this for debugging for now info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); ops.push(StoreOp::PutBlobs(block_root, blobs)); diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index f7b69a0d7..9254a3eb3 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -77,7 +77,7 @@ impl EarlyAttesterCache { source, target, block, - blobs, + blobs: blobs?, proto_block, }; diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index 721fe59ec..2c9955bdb 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -93,15 +93,17 @@ impl BlockWrapper { self.block().parent_root() } - pub fn deconstruct(self) -> (Arc>, Option>>) { + pub fn deconstruct(self) -> (Arc>, Result>>, BlobReconstructionError>) { match self { - BlockWrapper::Block(block) => (block, None), + BlockWrapper::Block(block) => (block, block + .reconstruct_empty_blobs(block_root) + .map(|blob_opt| blob_opt.map(Arc::new))), BlockWrapper::BlockAndBlob(block_sidecar_pair) => { let SignedBeaconBlockAndBlobsSidecar { beacon_block, blobs_sidecar, } = block_sidecar_pair; - (beacon_block, Some(blobs_sidecar)) + (beacon_block, Ok(Some(blobs_sidecar))) } } } From adf5f462d5bea99bcc3d603398d295bcf6769319 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 23 Dec 2022 12:59:04 -0500 Subject: [PATCH 4/8] fix blob validation for empty blobs when using --- beacon_node/beacon_chain/src/attester_cache.rs | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/beacon_chain/src/early_attester_cache.rs | 4 ++-- consensus/types/src/signed_block_and_blobs.rs | 11 +++++++---- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 24963a125..8e2dfdab8 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -42,6 +42,7 @@ pub enum Error { // Boxed to avoid an infinite-size recursion issue. BeaconChain(Box), MissingBeaconState(Hash256), + MissingBlobs, FailedToTransitionState(StateAdvanceError), CannotAttestToFutureState { state_slot: Slot, diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a8ed70561..3e2be9202 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2935,7 +2935,7 @@ impl BeaconChain { // If the write fails, revert fork choice to the version from disk, else we can // end up with blocks in fork choice that are missing from disk. // See https://github.com/sigp/lighthouse/issues/2028 - let (signed_block, blobs) = signed_block.deconstruct(); + let (signed_block, blobs) = signed_block.deconstruct(Some(block_root)); let block = signed_block.message(); let mut ops: Vec<_> = confirmed_state_roots .into_iter() diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 9254a3eb3..d8382481f 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -69,7 +69,7 @@ impl EarlyAttesterCache { }, }; - let (block, blobs) = block.deconstruct(); + let (block, blobs) = block.deconstruct(Some(beacon_block_root)); let item = CacheItem { epoch, committee_lengths, @@ -77,7 +77,7 @@ impl EarlyAttesterCache { source, target, block, - blobs: blobs?, + blobs: blobs.map_err(|_|Error::MissingBlobs)?, proto_block, }; diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index 2c9955bdb..fead50f38 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -93,11 +93,14 @@ impl BlockWrapper { self.block().parent_root() } - pub fn deconstruct(self) -> (Arc>, Result>>, BlobReconstructionError>) { + pub fn deconstruct(self, block_root: Option) -> (Arc>, Result>>, BlobReconstructionError>) { match self { - BlockWrapper::Block(block) => (block, block - .reconstruct_empty_blobs(block_root) - .map(|blob_opt| blob_opt.map(Arc::new))), + BlockWrapper::Block(block) => { + let blobs = block + .reconstruct_empty_blobs(block_root) + .map(|blob_opt| blob_opt.map(Arc::new)); + (block,blobs) + } , BlockWrapper::BlockAndBlob(block_sidecar_pair) => { let SignedBeaconBlockAndBlobsSidecar { beacon_block, From aeb243fe61492fc5edb0fd81bbbb42f0c592744f Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 23 Dec 2022 17:44:50 -0500 Subject: [PATCH 5/8] fmt --- .../beacon_chain/src/early_attester_cache.rs | 2 +- consensus/types/src/blobs_sidecar.rs | 4 ++-- consensus/types/src/signed_block_and_blobs.rs | 14 ++++++++++---- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index d8382481f..1216d5d4d 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -77,7 +77,7 @@ impl EarlyAttesterCache { source, target, block, - blobs: blobs.map_err(|_|Error::MissingBlobs)?, + blobs: blobs.map_err(|_| Error::MissingBlobs)?, proto_block, }; diff --git a/consensus/types/src/blobs_sidecar.rs b/consensus/types/src/blobs_sidecar.rs index d51d35d1c..f43a2e650 100644 --- a/consensus/types/src/blobs_sidecar.rs +++ b/consensus/types/src/blobs_sidecar.rs @@ -1,6 +1,6 @@ -use derivative::Derivative; use crate::test_utils::TestRandom; use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot}; +use derivative::Derivative; use kzg::KzgProof; use serde_derive::{Deserialize, Serialize}; use ssz::Encode; @@ -11,7 +11,7 @@ use tree_hash_derive::TreeHash; #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] #[derive( - Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, Default, TestRandom, Derivative + Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, Default, TestRandom, Derivative, )] #[serde(bound = "T: EthSpec")] #[derivative(PartialEq, Hash(bound = "T: EthSpec"))] diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index fead50f38..f21545f27 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -1,11 +1,11 @@ use crate::signed_beacon_block::BlobReconstructionError; use crate::{BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot}; +use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, DecodeError}; use ssz_derive::{Decode, Encode}; use std::sync::Arc; use tree_hash_derive::TreeHash; -use derivative::Derivative; #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq)] #[serde(bound = "T: EthSpec")] @@ -93,14 +93,20 @@ impl BlockWrapper { self.block().parent_root() } - pub fn deconstruct(self, block_root: Option) -> (Arc>, Result>>, BlobReconstructionError>) { + pub fn deconstruct( + self, + block_root: Option, + ) -> ( + Arc>, + Result>>, BlobReconstructionError>, + ) { match self { BlockWrapper::Block(block) => { let blobs = block .reconstruct_empty_blobs(block_root) .map(|blob_opt| blob_opt.map(Arc::new)); - (block,blobs) - } , + (block, blobs) + } BlockWrapper::BlockAndBlob(block_sidecar_pair) => { let SignedBeaconBlockAndBlobsSidecar { beacon_block, From 502b5e5bf0bb2030d304ee54d677d63f1ae4d601 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 28 Dec 2022 09:32:29 -0500 Subject: [PATCH 6/8] unused error lint --- beacon_node/network/src/sync/range_sync/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 93d5e3e19..80f34f8b4 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -559,7 +559,7 @@ impl slog::KV for BatchInfo { serializer.emit_usize("processed", self.failed_processing_attempts.len())?; serializer.emit_u8("processed_no_penalty", self.non_faulty_processing_attempts)?; serializer.emit_arguments("state", &format_args!("{:?}", self.state))?; - serializer.emit_arguments("batch_ty", &format_args!("{}", self.batch_type)); + serializer.emit_arguments("batch_ty", &format_args!("{}", self.batch_type))?; slog::Result::Ok(()) } } From 5b3b34a9d75aa837cbaaf41104ed35641dfbd504 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 28 Dec 2022 10:28:45 -0500 Subject: [PATCH 7/8] renames, remove , wrap BlockWrapper enum to make descontruction private --- .../beacon_chain/src/block_verification.rs | 2 +- beacon_node/http_api/src/publish_blocks.rs | 41 +++--- .../src/rpc/codec/ssz_snappy.rs | 13 +- .../lighthouse_network/src/rpc/methods.rs | 8 +- .../src/service/api_types.rs | 4 +- .../lighthouse_network/src/service/mod.rs | 2 +- .../network/src/beacon_processor/mod.rs | 4 +- .../beacon_processor/worker/rpc_methods.rs | 4 +- .../beacon_processor/worker/sync_methods.rs | 9 +- beacon_node/network/src/router/processor.rs | 30 ++--- .../network/src/sync/backfill_sync/mod.rs | 2 +- .../src/sync/block_sidecar_coupling.rs | 20 +-- beacon_node/network/src/sync/manager.rs | 118 ++++++++---------- .../network/src/sync/network_context.rs | 97 +++++++------- .../network/src/sync/range_sync/batch.rs | 62 ++------- .../network/src/sync/range_sync/chain.rs | 2 +- .../network/src/sync/range_sync/mod.rs | 4 +- .../network/src/sync/range_sync/range.rs | 6 +- consensus/types/src/signed_block_and_blobs.rs | 71 +++++++---- 19 files changed, 231 insertions(+), 268 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index c8d3aed79..2b759e4ad 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1141,7 +1141,7 @@ impl IntoExecutionPendingBlock for Arc( // Send the block, regardless of whether or not it is valid. The API // specification is very clear that this is the desired behaviour. - let wrapped_block = if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) { - if let Some(sidecar) = chain.blob_cache.pop(&block_root) { - let block_and_blobs = SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: Arc::new(sidecar), - }; - crate::publish_pubsub_message( - network_tx, - PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()), - )?; - BlockWrapper::BlockAndBlob(block_and_blobs) + let wrapped_block: BlockWrapper = + if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) { + if let Some(sidecar) = chain.blob_cache.pop(&block_root) { + let block_and_blobs = SignedBeaconBlockAndBlobsSidecar { + beacon_block: block, + blobs_sidecar: Arc::new(sidecar), + }; + crate::publish_pubsub_message( + network_tx, + PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()), + )?; + block_and_blobs.into() + } else { + //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required + return Err(warp_utils::reject::broadcast_without_import(format!( + "no blob cached for block" + ))); + } } else { - //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required - return Err(warp_utils::reject::broadcast_without_import(format!( - "no blob cached for block" - ))); - } - } else { - crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; - BlockWrapper::Block(block) - }; + crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; + block.into() + }; // Determine the delay after the start of the slot, register it with metrics. let block = wrapped_block.block(); diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index df5350d94..eb5cc7f27 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -73,7 +73,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(), - RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(), + RPCResponse::BlockAndBlobsByRoot(res) => res.as_ssz_bytes(), RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => @@ -439,7 +439,8 @@ fn context_bytes( SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()), }; } - if let RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) = rpc_variant { + if let RPCResponse::BlobsByRange(_) | RPCResponse::BlockAndBlobsByRoot(_) = rpc_variant + { return fork_context.to_context_bytes(ForkName::Eip4844); } } @@ -585,7 +586,7 @@ fn handle_v1_response( )))), _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, - "Invalid forkname for blobsbyrange".to_string(), + "Invalid fork name for blobs by range".to_string(), )), } } @@ -597,12 +598,12 @@ fn handle_v1_response( ) })?; match fork_name { - ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRoot(Arc::new( + ForkName::Eip4844 => Ok(Some(RPCResponse::BlockAndBlobsByRoot( SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?, - )))), + ))), _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, - "Invalid forkname for blobsbyroot".to_string(), + "Invalid fork name for block and blobs by root".to_string(), )), } } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 53e6b6759..02e24d8e1 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -281,7 +281,7 @@ pub enum RPCResponse { LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. - BlobsByRoot(Arc>), + BlockAndBlobsByRoot(SignedBeaconBlockAndBlobsSidecar), /// A PONG response to a PING request. Pong(Ping), @@ -372,7 +372,7 @@ impl RPCCodedResponse { RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, RPCResponse::BlobsByRange(_) => true, - RPCResponse::BlobsByRoot(_) => true, + RPCResponse::BlockAndBlobsByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, RPCResponse::LightClientBootstrap(_) => false, @@ -409,7 +409,7 @@ impl RPCResponse { RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange, RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange, - RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, + RPCResponse::BlockAndBlobsByRoot(_) => Protocol::BlobsByRoot, RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::MetaData(_) => Protocol::MetaData, RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, @@ -449,7 +449,7 @@ impl std::fmt::Display for RPCResponse { RPCResponse::BlobsByRange(blob) => { write!(f, "BlobsByRange: Blob slot: {}", blob.beacon_block_slot) } - RPCResponse::BlobsByRoot(blob) => { + RPCResponse::BlockAndBlobsByRoot(blob) => { write!( f, "BlobsByRoot: Blob slot: {}", diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index b6a033020..c9c239d8c 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -83,7 +83,7 @@ pub enum Response { /// A response to a LightClientUpdate request. LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. - BlobsByRoot(Option>>), + BlobsByRoot(Option>), } impl std::convert::From> for RPCCodedResponse { @@ -98,7 +98,7 @@ impl std::convert::From> for RPCCodedResponse RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), }, Response::BlobsByRoot(r) => match r { - Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRoot(b)), + Some(b) => RPCCodedResponse::Success(RPCResponse::BlockAndBlobsByRoot(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRoot), }, Response::BlobsByRange(r) => match r { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 9adf7699b..d59bc4bfd 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1315,7 +1315,7 @@ impl Network { RPCResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } - RPCResponse::BlobsByRoot(resp) => { + RPCResponse::BlockAndBlobsByRoot(resp) => { self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) } // Should never be reached diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 9de006c84..37d6edef8 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1699,7 +1699,7 @@ impl BeaconProcessor { message_id, peer_id, peer_client, - BlockWrapper::Block(block), + block.into(), work_reprocessing_tx, duplicate_cache, seen_timestamp, @@ -1721,7 +1721,7 @@ impl BeaconProcessor { message_id, peer_id, peer_client, - BlockWrapper::BlockAndBlob(block_sidecar_pair), + block_sidecar_pair.into(), work_reprocessing_tx, duplicate_cache, seen_timestamp, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 3ade1bb87..69bd7da11 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -230,10 +230,10 @@ impl Worker { Ok((Some(block), Some(blobs))) => { self.send_response( peer_id, - Response::BlobsByRoot(Some(Arc::new(SignedBeaconBlockAndBlobsSidecar { + Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar { beacon_block: block, blobs_sidecar: blobs, - }))), + })), request_id, ); send_block_count += 1; diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 5af62c37d..b3465c56d 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -188,14 +188,7 @@ impl Worker { let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); - let unwrapped = downloaded_blocks - .into_iter() - .map(|block| match block { - BlockWrapper::Block(block) => block, - //FIXME(sean) handle blobs in backfill - BlockWrapper::BlockAndBlob(_) => todo!(), - }) - .collect(); + let unwrapped = downloaded_blocks.into_iter().map(|_| todo!()).collect(); match self.process_backfill_blocks(unwrapped) { (_, Ok(_)) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 5ee0e367b..d0879baba 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -223,10 +223,10 @@ impl Processor { SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { unreachable!("Block lookups do not request BBRange requests") } - id @ (SyncId::BackFillSync { .. } - | SyncId::RangeSync { .. } - | SyncId::BackFillSidecarPair { .. } - | SyncId::RangeSidecarPair { .. }) => id, + id @ (SyncId::BackFillBlocks { .. } + | SyncId::RangeBlocks { .. } + | SyncId::BackFillBlobs { .. } + | SyncId::RangeBlobs { .. }) => id, }, RequestId::Router => unreachable!("All BBRange requests belong to sync"), }; @@ -258,7 +258,7 @@ impl Processor { ); if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::RpcGlob { + self.send_to_sync(SyncMessage::RpcBlobs { peer_id, request_id: id, blob_sidecar, @@ -282,10 +282,10 @@ impl Processor { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, - SyncId::BackFillSync { .. } - | SyncId::RangeSync { .. } - | SyncId::RangeSidecarPair { .. } - | SyncId::BackFillSidecarPair { .. } => { + SyncId::BackFillBlocks { .. } + | SyncId::RangeBlocks { .. } + | SyncId::RangeBlobs { .. } + | SyncId::BackFillBlobs { .. } => { unreachable!("Batch syncing do not request BBRoot requests") } }, @@ -310,15 +310,15 @@ impl Processor { &mut self, peer_id: PeerId, request_id: RequestId, - block_and_blobs: Option>>, + block_and_blobs: Option>, ) { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, - SyncId::BackFillSync { .. } - | SyncId::RangeSync { .. } - | SyncId::RangeSidecarPair { .. } - | SyncId::BackFillSidecarPair { .. } => { + SyncId::BackFillBlocks { .. } + | SyncId::RangeBlocks { .. } + | SyncId::RangeBlobs { .. } + | SyncId::BackFillBlobs { .. } => { unreachable!("Batch syncing does not request BBRoot requests") } }, @@ -330,7 +330,7 @@ impl Processor { "Received BlockAndBlobssByRoot Response"; "peer" => %peer_id, ); - self.send_to_sync(SyncMessage::RpcBlockAndGlob { + self.send_to_sync(SyncMessage::RpcBlockAndBlobs { peer_id, request_id, block_and_blobs, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 56ed55153..ad1bfb1d4 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -536,7 +536,7 @@ impl BackFillSync { let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); self.current_processing_batch = Some(batch_id); - let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks()); + let work_event = BeaconWorkEvent::chain_segment(process_id, blocks); if let Err(e) = network.processor_channel().try_send(work_event) { crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch", "error" => %e, "batch" => self.processing_target); diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index f82417db3..46ac5bd0f 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,12 +1,9 @@ use std::{collections::VecDeque, sync::Arc}; -use types::{ - signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock, - SignedBeaconBlockAndBlobsSidecar, -}; +use types::{signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock}; #[derive(Debug, Default)] -pub struct BlockBlobRequestInfo { +pub struct BlocksAndBlobsRequestInfo { /// Blocks we have received awaiting for their corresponding sidecar. accumulated_blocks: VecDeque>>, /// Sidecars we have received awaiting for their corresponding block. @@ -17,7 +14,7 @@ pub struct BlockBlobRequestInfo { is_sidecars_stream_terminated: bool, } -impl BlockBlobRequestInfo { +impl BlocksAndBlobsRequestInfo { pub fn add_block_response(&mut self, maybe_block: Option>>) { match maybe_block { Some(block) => self.accumulated_blocks.push_back(block), @@ -33,7 +30,7 @@ impl BlockBlobRequestInfo { } pub fn into_responses(self) -> Result>, &'static str> { - let BlockBlobRequestInfo { + let BlocksAndBlobsRequestInfo { accumulated_blocks, mut accumulated_sidecars, .. @@ -51,14 +48,9 @@ impl BlockBlobRequestInfo { { let blobs_sidecar = accumulated_sidecars.pop_front().ok_or("missing sidecar")?; - Ok(BlockWrapper::BlockAndBlob( - SignedBeaconBlockAndBlobsSidecar { - beacon_block, - blobs_sidecar, - }, - )) + Ok(BlockWrapper::new_with_blobs(beacon_block, blobs_sidecar)) } else { - Ok(BlockWrapper::Block(beacon_block)) + Ok(beacon_block.into()) } }) .collect::, _>>(); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 29838bde7..5da203e0e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,13 +35,13 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlob, SyncNetworkContext}; +use super::network_context::{BlockOrBlobs, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use crate::sync::range_sync::ExpectedBatchTy; +use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; @@ -79,13 +79,13 @@ pub enum RequestId { /// Request searching for a block's parent. The id is the chain ParentLookup { id: Id }, /// Request was from the backfill sync algorithm. - BackFillSync { id: Id }, - /// Backfill request for blocks and sidecars. - BackFillSidecarPair { id: Id }, + BackFillBlocks { id: Id }, + /// Backfill request for blob sidecars. + BackFillBlobs { id: Id }, /// The request was from a chain in the range sync algorithm. - RangeSync { id: Id }, - /// The request was from a chain in range, asking for ranges of blocks and sidecars. - RangeSidecarPair { id: Id }, + RangeBlocks { id: Id }, + /// The request was from a chain in range, asking for ranges blob sidecars. + RangeBlobs { id: Id }, } #[derive(Debug)] @@ -103,7 +103,7 @@ pub enum SyncMessage { }, /// A blob has been received from the RPC. - RpcGlob { + RpcBlobs { request_id: RequestId, peer_id: PeerId, blob_sidecar: Option>>, @@ -111,10 +111,10 @@ pub enum SyncMessage { }, /// A block and blobs have been received from the RPC. - RpcBlockAndGlob { + RpcBlockAndBlobs { request_id: RequestId, peer_id: PeerId, - block_and_blobs: Option>>, + block_and_blobs: Option>, seen_timestamp: Duration, }, @@ -295,10 +295,10 @@ impl SyncManager { self.block_lookups .parent_lookup_failed(id, peer_id, &mut self.network, error); } - RequestId::BackFillSync { id } => { + RequestId::BackFillBlocks { id } => { if let Some(batch_id) = self .network - .backfill_request_failed(id, ExpectedBatchTy::OnlyBlock) + .backfill_request_failed(id, ByRangeRequestType::Blocks) { match self .backfill_sync @@ -310,10 +310,10 @@ impl SyncManager { } } - RequestId::BackFillSidecarPair { id } => { + RequestId::BackFillBlobs { id } => { if let Some(batch_id) = self .network - .backfill_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs) + .backfill_request_failed(id, ByRangeRequestType::BlocksAndBlobs) { match self .backfill_sync @@ -324,10 +324,10 @@ impl SyncManager { } } } - RequestId::RangeSync { id } => { + RequestId::RangeBlocks { id } => { if let Some((chain_id, batch_id)) = self .network - .range_sync_request_failed(id, ExpectedBatchTy::OnlyBlock) + .range_sync_request_failed(id, ByRangeRequestType::Blocks) { self.range_sync.inject_error( &mut self.network, @@ -339,10 +339,10 @@ impl SyncManager { self.update_sync_state() } } - RequestId::RangeSidecarPair { id } => { + RequestId::RangeBlobs { id } => { if let Some((chain_id, batch_id)) = self .network - .range_sync_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs) + .range_sync_request_failed(id, ByRangeRequestType::BlocksAndBlobs) { self.range_sync.inject_error( &mut self.network, @@ -648,18 +648,18 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &mut self.network), }, - SyncMessage::RpcGlob { + SyncMessage::RpcBlobs { request_id, peer_id, blob_sidecar, seen_timestamp, - } => self.rpc_sidecar_received(request_id, peer_id, blob_sidecar, seen_timestamp), - SyncMessage::RpcBlockAndGlob { + } => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp), + SyncMessage::RpcBlockAndBlobs { request_id, peer_id, block_and_blobs, seen_timestamp, - } => self.rpc_block_sidecar_pair_received( + } => self.rpc_block_block_and_blobs_received( request_id, peer_id, block_and_blobs, @@ -734,18 +734,18 @@ impl SyncManager { RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( id, peer_id, - beacon_block.map(|block| BlockWrapper::Block(block)), + beacon_block.map(|block| block.into()), seen_timestamp, &mut self.network, ), RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( id, peer_id, - beacon_block.map(|block| BlockWrapper::Block(block)), + beacon_block.map(|block| block.into()), seen_timestamp, &mut self.network, ), - RequestId::BackFillSync { id } => { + RequestId::BackFillBlocks { id } => { let is_stream_terminator = beacon_block.is_none(); if let Some(batch_id) = self .network @@ -756,7 +756,7 @@ impl SyncManager { batch_id, &peer_id, id, - beacon_block.map(|block| BlockWrapper::Block(block)), + beacon_block.map(|block| block.into()), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -768,7 +768,7 @@ impl SyncManager { } } } - RequestId::RangeSync { id } => { + RequestId::RangeBlocks { id } => { let is_stream_terminator = beacon_block.is_none(); if let Some((chain_id, batch_id)) = self .network @@ -780,28 +780,28 @@ impl SyncManager { chain_id, batch_id, id, - beacon_block.map(|block| BlockWrapper::Block(block)), + beacon_block.map(|block| block.into()), ); self.update_sync_state(); } } - RequestId::BackFillSidecarPair { id } => { - self.block_blob_backfill_response(id, peer_id, beacon_block.into()) + RequestId::BackFillBlobs { id } => { + self.blobs_backfill_response(id, peer_id, beacon_block.into()) } - RequestId::RangeSidecarPair { id } => { - self.block_blob_range_response(id, peer_id, beacon_block.into()) + RequestId::RangeBlobs { id } => { + self.blobs_range_response(id, peer_id, beacon_block.into()) } } } /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. - fn block_blob_range_response( + fn blobs_range_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlob, + block_or_blob: BlockOrBlobs, ) { if let Some((chain_id, batch_id, block_responses)) = self .network @@ -834,7 +834,7 @@ impl SyncManager { "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy - let id = RequestId::RangeSidecarPair { id }; + let id = RequestId::RangeBlobs { id }; self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } @@ -843,11 +843,11 @@ impl SyncManager { /// Handles receiving a response for a Backfill sync request that should have both blocks and /// blobs. - fn block_blob_backfill_response( + fn blobs_backfill_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlob, + block_or_blob: BlockOrBlobs, ) { if let Some((batch_id, block_responses)) = self .network @@ -886,14 +886,14 @@ impl SyncManager { "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy - let id = RequestId::BackFillSidecarPair { id }; + let id = RequestId::BackFillBlobs { id }; self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } } } - fn rpc_sidecar_received( + fn rpc_blobs_received( &mut self, request_id: RequestId, peer_id: PeerId, @@ -904,57 +904,47 @@ impl SyncManager { RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => { unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block") } - RequestId::BackFillSync { .. } => { + RequestId::BackFillBlocks { .. } => { unreachable!("An only blocks request does not receive sidecars") } - RequestId::BackFillSidecarPair { id } => { - self.block_blob_backfill_response(id, peer_id, maybe_sidecar.into()) + RequestId::BackFillBlobs { id } => { + self.blobs_backfill_response(id, peer_id, maybe_sidecar.into()) } - RequestId::RangeSync { .. } => { + RequestId::RangeBlocks { .. } => { unreachable!("Only-blocks range requests don't receive sidecars") } - RequestId::RangeSidecarPair { id } => { - self.block_blob_range_response(id, peer_id, maybe_sidecar.into()) + RequestId::RangeBlobs { id } => { + self.blobs_range_response(id, peer_id, maybe_sidecar.into()) } } } - fn rpc_block_sidecar_pair_received( + fn rpc_block_block_and_blobs_received( &mut self, request_id: RequestId, peer_id: PeerId, - block_sidecar_pair: Option>>, + block_sidecar_pair: Option>, seen_timestamp: Duration, ) { match request_id { RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( id, peer_id, - block_sidecar_pair.map(|block_sidecar_pair| { - BlockWrapper::BlockAndBlob( - // TODO: why is this in an arc - (*block_sidecar_pair).clone(), - ) - }), + block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()), seen_timestamp, &mut self.network, ), RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( id, peer_id, - block_sidecar_pair.map(|block_sidecar_pair| { - BlockWrapper::BlockAndBlob( - // TODO: why is this in an arc - (*block_sidecar_pair).clone(), - ) - }), + block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()), seen_timestamp, &mut self.network, ), - RequestId::BackFillSync { .. } - | RequestId::BackFillSidecarPair { .. } - | RequestId::RangeSync { .. } - | RequestId::RangeSidecarPair { .. } => unreachable!( + RequestId::BackFillBlocks { .. } + | RequestId::BackFillBlobs { .. } + | RequestId::RangeBlocks { .. } + | RequestId::RangeBlobs { .. } => unreachable!( "since range requests are not block-glob coupled, this should never be reachable" ), } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 36da3bf82..c54b3b1a9 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,9 +1,9 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -use super::block_sidecar_coupling::BlockBlobRequestInfo; +use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::manager::{Id, RequestId as SyncRequestId}; -use super::range_sync::{BatchId, ChainId, ExpectedBatchTy}; +use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::beacon_processor::WorkEvent; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; @@ -38,11 +38,12 @@ pub struct SyncNetworkContext { backfill_requests: FnvHashMap, /// BlocksByRange requests paired with BlobsByRange requests made by the range. - range_sidecar_pair_requests: - FnvHashMap)>, + range_blocks_and_blobs_requests: + FnvHashMap)>, /// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync. - backfill_sidecar_pair_requests: FnvHashMap)>, + backfill_blocks_and_blobs_requests: + FnvHashMap)>, /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. @@ -58,20 +59,20 @@ pub struct SyncNetworkContext { } /// Small enumeration to make dealing with block and blob requests easier. -pub enum BlockOrBlob { +pub enum BlockOrBlobs { Block(Option>>), - Blob(Option>>), + Blobs(Option>>), } -impl From>>> for BlockOrBlob { +impl From>>> for BlockOrBlobs { fn from(block: Option>>) -> Self { - BlockOrBlob::Block(block) + BlockOrBlobs::Block(block) } } -impl From>>> for BlockOrBlob { +impl From>>> for BlockOrBlobs { fn from(blob: Option>>) -> Self { - BlockOrBlob::Blob(blob) + BlockOrBlobs::Blobs(blob) } } @@ -89,8 +90,8 @@ impl SyncNetworkContext { request_id: 1, range_requests: Default::default(), backfill_requests: Default::default(), - range_sidecar_pair_requests: Default::default(), - backfill_sidecar_pair_requests: Default::default(), + range_blocks_and_blobs_requests: Default::default(), + backfill_blocks_and_blobs_requests: Default::default(), execution_engine_state: EngineState::Online, // always assume `Online` at the start beacon_processor_send, chain, @@ -140,13 +141,13 @@ impl SyncNetworkContext { pub fn blocks_by_range_request( &mut self, peer_id: PeerId, - batch_type: ExpectedBatchTy, + batch_type: ByRangeRequestType, request: BlocksByRangeRequest, chain_id: ChainId, batch_id: BatchId, ) -> Result { match batch_type { - ExpectedBatchTy::OnlyBlock => { + ByRangeRequestType::Blocks => { trace!( self.log, "Sending BlocksByRange request"; @@ -156,7 +157,7 @@ impl SyncNetworkContext { ); let request = Request::BlocksByRange(request); let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeSync { id }); + let request_id = RequestId::Sync(SyncRequestId::RangeBlocks { id }); self.send_network_msg(NetworkMessage::SendRequest { peer_id, request, @@ -165,7 +166,7 @@ impl SyncNetworkContext { self.range_requests.insert(id, (chain_id, batch_id)); Ok(id) } - ExpectedBatchTy::OnlyBlockBlobs => { + ByRangeRequestType::BlocksAndBlobs => { debug!( self.log, "Sending BlocksByRange and BlobsByRange requests"; @@ -176,7 +177,7 @@ impl SyncNetworkContext { // create the shared request id. This is fine since the rpc handles substream ids. let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeSidecarPair { id }); + let request_id = RequestId::Sync(SyncRequestId::RangeBlobs { id }); // Create the blob request based on the blob request. let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { @@ -196,8 +197,8 @@ impl SyncNetworkContext { request: blobs_request, request_id, })?; - let block_blob_info = BlockBlobRequestInfo::default(); - self.range_sidecar_pair_requests + let block_blob_info = BlocksAndBlobsRequestInfo::default(); + self.range_blocks_and_blobs_requests .insert(id, (chain_id, batch_id, block_blob_info)); Ok(id) } @@ -208,12 +209,12 @@ impl SyncNetworkContext { pub fn backfill_blocks_by_range_request( &mut self, peer_id: PeerId, - batch_type: ExpectedBatchTy, + batch_type: ByRangeRequestType, request: BlocksByRangeRequest, batch_id: BatchId, ) -> Result { match batch_type { - ExpectedBatchTy::OnlyBlock => { + ByRangeRequestType::Blocks => { trace!( self.log, "Sending backfill BlocksByRange request"; @@ -223,7 +224,7 @@ impl SyncNetworkContext { ); let request = Request::BlocksByRange(request); let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id }); + let request_id = RequestId::Sync(SyncRequestId::BackFillBlocks { id }); self.send_network_msg(NetworkMessage::SendRequest { peer_id, request, @@ -232,7 +233,7 @@ impl SyncNetworkContext { self.backfill_requests.insert(id, batch_id); Ok(id) } - ExpectedBatchTy::OnlyBlockBlobs => { + ByRangeRequestType::BlocksAndBlobs => { debug!( self.log, "Sending backfill BlocksByRange and BlobsByRange requests"; @@ -243,7 +244,7 @@ impl SyncNetworkContext { // create the shared request id. This is fine since the rpc handles substream ids. let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::BackFillSidecarPair { id }); + let request_id = RequestId::Sync(SyncRequestId::BackFillBlobs { id }); // Create the blob request based on the blob request. let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { @@ -263,8 +264,8 @@ impl SyncNetworkContext { request: blobs_request, request_id, })?; - let block_blob_info = BlockBlobRequestInfo::default(); - self.backfill_sidecar_pair_requests + let block_blob_info = BlocksAndBlobsRequestInfo::default(); + self.backfill_blocks_and_blobs_requests .insert(id, (batch_id, block_blob_info)); Ok(id) } @@ -288,18 +289,18 @@ impl SyncNetworkContext { pub fn range_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlob, + block_or_blob: BlockOrBlobs, ) -> Option<( ChainId, BatchId, Result>, &'static str>, )> { - match self.range_sidecar_pair_requests.entry(request_id) { + match self.range_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, _, info) = entry.get_mut(); match block_or_blob { - BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything @@ -316,28 +317,28 @@ impl SyncNetworkContext { pub fn range_sync_request_failed( &mut self, request_id: Id, - batch_type: ExpectedBatchTy, + batch_type: ByRangeRequestType, ) -> Option<(ChainId, BatchId)> { match batch_type { - ExpectedBatchTy::OnlyBlockBlobs => self - .range_sidecar_pair_requests + ByRangeRequestType::BlocksAndBlobs => self + .range_blocks_and_blobs_requests .remove(&request_id) .map(|(chain_id, batch_id, _info)| (chain_id, batch_id)), - ExpectedBatchTy::OnlyBlock => self.range_requests.remove(&request_id), + ByRangeRequestType::Blocks => self.range_requests.remove(&request_id), } } pub fn backfill_request_failed( &mut self, request_id: Id, - batch_type: ExpectedBatchTy, + batch_type: ByRangeRequestType, ) -> Option { match batch_type { - ExpectedBatchTy::OnlyBlockBlobs => self - .backfill_sidecar_pair_requests + ByRangeRequestType::BlocksAndBlobs => self + .backfill_blocks_and_blobs_requests .remove(&request_id) .map(|(batch_id, _info)| batch_id), - ExpectedBatchTy::OnlyBlock => self.backfill_requests.remove(&request_id), + ByRangeRequestType::Blocks => self.backfill_requests.remove(&request_id), } } @@ -360,14 +361,14 @@ impl SyncNetworkContext { pub fn backfill_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlob, + block_or_blob: BlockOrBlobs, ) -> Option<(BatchId, Result>, &'static str>)> { - match self.backfill_sidecar_pair_requests.entry(request_id) { + match self.backfill_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); match block_or_blob { - BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything @@ -533,7 +534,7 @@ impl SyncNetworkContext { /// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// blocks and blobs. - pub fn batch_type(&self, epoch: types::Epoch) -> ExpectedBatchTy { + pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType { const _: () = assert!( super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1 && super::range_sync::EPOCHS_PER_BATCH == 1, @@ -542,18 +543,18 @@ impl SyncNetworkContext { #[cfg(test)] { // Keep tests only for blocks. - return ExpectedBatchTy::OnlyBlock; + return ByRangeRequestType::Blocks; } #[cfg(not(test))] { if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if epoch >= data_availability_boundary { - ExpectedBatchTy::OnlyBlockBlobs + ByRangeRequestType::BlocksAndBlobs } else { - ExpectedBatchTy::OnlyBlock + ByRangeRequestType::Blocks } } else { - ExpectedBatchTy::OnlyBlock + ByRangeRequestType::Blocks } } } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 80f34f8b4..184dcffc4 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -4,10 +4,9 @@ use lighthouse_network::PeerId; use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::ops::Sub; -use std::sync::Arc; use strum::Display; use types::signed_block_and_blobs::BlockWrapper; -use types::{Epoch, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot}; +use types::{Epoch, EthSpec, Slot}; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; @@ -16,36 +15,12 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; -pub enum BatchTy { - Blocks(Vec>>), - BlocksAndBlobs(Vec>), -} - -impl BatchTy { - pub fn into_wrapped_blocks(self) -> Vec> { - match self { - BatchTy::Blocks(blocks) => blocks - .into_iter() - .map(|block| BlockWrapper::Block(block)) - .collect(), - BatchTy::BlocksAndBlobs(block_sidecar_pair) => block_sidecar_pair - .into_iter() - .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob(block_sidecar_pair)) - .collect(), - } - } -} - -/// Error representing a batch with mixed block types. -#[derive(Debug)] -pub struct MixedBlockTyErr; - /// Type of expected batch. #[derive(Debug, Copy, Clone, Display)] #[strum(serialize_all = "snake_case")] -pub enum ExpectedBatchTy { - OnlyBlockBlobs, - OnlyBlock, +pub enum ByRangeRequestType { + BlocksAndBlobs, + Blocks, } /// Allows customisation of the above constants used in other sync methods such as BackFillSync. @@ -131,7 +106,7 @@ pub struct BatchInfo { /// State of the batch. state: BatchState, /// Whether this batch contains all blocks or all blocks and blobs. - batch_type: ExpectedBatchTy, + batch_type: ByRangeRequestType, /// Pin the generic marker: std::marker::PhantomData, } @@ -180,7 +155,7 @@ impl BatchInfo { /// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to /// deal with this for now. /// This means finalization might be slower in eip4844 - pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ExpectedBatchTy) -> Self { + pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ByRangeRequestType) -> Self { let start_slot = start_epoch.start_slot(T::slots_per_epoch()); let end_slot = start_slot + num_of_epochs * T::slots_per_epoch(); BatchInfo { @@ -243,7 +218,7 @@ impl BatchInfo { } /// Returns a BlocksByRange request associated with the batch. - pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ExpectedBatchTy) { + pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { ( BlocksByRangeRequest { start_slot: self.start_slot.into(), @@ -408,30 +383,11 @@ impl BatchInfo { } } - pub fn start_processing(&mut self) -> Result, WrongState> { + pub fn start_processing(&mut self) -> Result>, WrongState> { match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks) => { self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); - match self.batch_type { - ExpectedBatchTy::OnlyBlockBlobs => { - let blocks = blocks.into_iter().map(|block| { - let BlockWrapper::BlockAndBlob(block_and_blob) = block else { - panic!("Batches should never have a mixed type. This is a bug. Contact D") - }; - block_and_blob - }).collect(); - Ok(BatchTy::BlocksAndBlobs(blocks)) - } - ExpectedBatchTy::OnlyBlock => { - let blocks = blocks.into_iter().map(|block| { - let BlockWrapper::Block(block) = block else { - panic!("Batches should never have a mixed type. This is a bug. Contact D") - }; - block - }).collect(); - Ok(BatchTy::Blocks(blocks)) - } - } + Ok(blocks) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 89e120050..d60de3224 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -332,7 +332,7 @@ impl SyncingChain { let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized); self.current_processing_batch = Some(batch_id); - let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks()); + let work_event = BeaconWorkEvent::chain_segment(process_id, blocks); if let Err(e) = beacon_processor_send.try_send(work_event) { crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 284260321..d0f2f9217 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -9,8 +9,8 @@ mod range; mod sync_type; pub use batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchTy, - ExpectedBatchTy, + BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + ByRangeRequestType, }; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 1e3474fa5..09d93b0e8 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -373,7 +373,7 @@ where #[cfg(test)] mod tests { use crate::service::RequestId; - use crate::sync::range_sync::ExpectedBatchTy; + use crate::sync::range_sync::ByRangeRequestType; use crate::NetworkMessage; use super::*; @@ -686,7 +686,7 @@ mod tests { let (peer1, local_info, head_info) = rig.head_peer(); range.add_peer(&mut rig.cx, local_info, peer1, head_info); let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { (rig.cx.range_sync_response(id, true).unwrap(), id) } other => panic!("unexpected request {:?}", other), @@ -705,7 +705,7 @@ mod tests { let (peer2, local_info, finalized_info) = rig.finalized_peer(); range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { (rig.cx.range_sync_response(id, true).unwrap(), id) } other => panic!("unexpected request {:?}", other), diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index f21545f27..c589fbcfe 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -34,33 +34,56 @@ impl SignedBeaconBlockAndBlobsSidecar { } } +/// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. This newtype +/// wraps the `BlockWrapperInner` to ensure blobs cannot be accessed via an enum match. This would +/// circumvent empty blob reconstruction when accessing blobs. +#[derive(Clone, Debug, Derivative)] +#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] +pub struct BlockWrapper(BlockWrapperInner); + /// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. #[derive(Clone, Debug, Derivative)] #[derivative(PartialEq, Hash(bound = "T: EthSpec"))] -pub enum BlockWrapper { +pub enum BlockWrapperInner { Block(Arc>), BlockAndBlob(SignedBeaconBlockAndBlobsSidecar), } impl BlockWrapper { + pub fn new(block: Arc>) -> Self { + Self(BlockWrapperInner::Block(block)) + } + + pub fn new_with_blobs( + beacon_block: Arc>, + blobs_sidecar: Arc>, + ) -> Self { + Self(BlockWrapperInner::BlockAndBlob( + SignedBeaconBlockAndBlobsSidecar { + beacon_block, + blobs_sidecar, + }, + )) + } + pub fn slot(&self) -> Slot { - match self { - BlockWrapper::Block(block) => block.slot(), - BlockWrapper::BlockAndBlob(block_sidecar_pair) => { + match &self.0 { + BlockWrapperInner::Block(block) => block.slot(), + BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.slot() } } } pub fn block(&self) -> &SignedBeaconBlock { - match self { - BlockWrapper::Block(block) => &block, - BlockWrapper::BlockAndBlob(block_sidecar_pair) => &block_sidecar_pair.beacon_block, + match &self.0 { + BlockWrapperInner::Block(block) => &block, + BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => &block_sidecar_pair.beacon_block, } } pub fn block_cloned(&self) -> Arc> { - match self { - BlockWrapper::Block(block) => block.clone(), - BlockWrapper::BlockAndBlob(block_sidecar_pair) => { + match &self.0 { + BlockWrapperInner::Block(block) => block.clone(), + BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.clone() } } @@ -70,20 +93,20 @@ impl BlockWrapper { &self, block_root: Option, ) -> Result>>, BlobReconstructionError> { - match self { - BlockWrapper::Block(block) => block + match &self.0 { + BlockWrapperInner::Block(block) => block .reconstruct_empty_blobs(block_root) .map(|blob_opt| blob_opt.map(Arc::new)), - BlockWrapper::BlockAndBlob(block_sidecar_pair) => { + BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { Ok(Some(block_sidecar_pair.blobs_sidecar.clone())) } } } pub fn message(&self) -> crate::BeaconBlockRef { - match self { - BlockWrapper::Block(block) => block.message(), - BlockWrapper::BlockAndBlob(block_sidecar_pair) => { + match &self.0 { + BlockWrapperInner::Block(block) => block.message(), + BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.message() } } @@ -100,14 +123,14 @@ impl BlockWrapper { Arc>, Result>>, BlobReconstructionError>, ) { - match self { - BlockWrapper::Block(block) => { + match self.0 { + BlockWrapperInner::Block(block) => { let blobs = block .reconstruct_empty_blobs(block_root) .map(|blob_opt| blob_opt.map(Arc::new)); (block, blobs) } - BlockWrapper::BlockAndBlob(block_sidecar_pair) => { + BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { let SignedBeaconBlockAndBlobsSidecar { beacon_block, blobs_sidecar, @@ -120,12 +143,18 @@ impl BlockWrapper { impl From> for BlockWrapper { fn from(block: SignedBeaconBlock) -> Self { - BlockWrapper::Block(Arc::new(block)) + BlockWrapper(BlockWrapperInner::Block(Arc::new(block))) } } impl From>> for BlockWrapper { fn from(block: Arc>) -> Self { - BlockWrapper::Block(block) + BlockWrapper(BlockWrapperInner::Block(block)) + } +} + +impl From> for BlockWrapper { + fn from(block: SignedBeaconBlockAndBlobsSidecar) -> Self { + BlockWrapper(BlockWrapperInner::BlockAndBlob(block)) } } From 1931a442dc004d8664e4d6811d2e8c5bbc259ceb Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 28 Dec 2022 10:30:36 -0500 Subject: [PATCH 8/8] Revert "renames, remove , wrap BlockWrapper enum to make descontruction private" This reverts commit 5b3b34a9d75aa837cbaaf41104ed35641dfbd504. --- .../beacon_chain/src/block_verification.rs | 2 +- beacon_node/http_api/src/publish_blocks.rs | 41 +++--- .../src/rpc/codec/ssz_snappy.rs | 13 +- .../lighthouse_network/src/rpc/methods.rs | 8 +- .../src/service/api_types.rs | 4 +- .../lighthouse_network/src/service/mod.rs | 2 +- .../network/src/beacon_processor/mod.rs | 4 +- .../beacon_processor/worker/rpc_methods.rs | 4 +- .../beacon_processor/worker/sync_methods.rs | 9 +- beacon_node/network/src/router/processor.rs | 30 ++--- .../network/src/sync/backfill_sync/mod.rs | 2 +- .../src/sync/block_sidecar_coupling.rs | 20 ++- beacon_node/network/src/sync/manager.rs | 118 ++++++++++-------- .../network/src/sync/network_context.rs | 97 +++++++------- .../network/src/sync/range_sync/batch.rs | 62 +++++++-- .../network/src/sync/range_sync/chain.rs | 2 +- .../network/src/sync/range_sync/mod.rs | 4 +- .../network/src/sync/range_sync/range.rs | 6 +- consensus/types/src/signed_block_and_blobs.rs | 71 ++++------- 19 files changed, 268 insertions(+), 231 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 2b759e4ad..c8d3aed79 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1141,7 +1141,7 @@ impl IntoExecutionPendingBlock for Arc( // Send the block, regardless of whether or not it is valid. The API // specification is very clear that this is the desired behaviour. - let wrapped_block: BlockWrapper = - if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) { - if let Some(sidecar) = chain.blob_cache.pop(&block_root) { - let block_and_blobs = SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: Arc::new(sidecar), - }; - crate::publish_pubsub_message( - network_tx, - PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()), - )?; - block_and_blobs.into() - } else { - //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required - return Err(warp_utils::reject::broadcast_without_import(format!( - "no blob cached for block" - ))); - } + let wrapped_block = if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) { + if let Some(sidecar) = chain.blob_cache.pop(&block_root) { + let block_and_blobs = SignedBeaconBlockAndBlobsSidecar { + beacon_block: block, + blobs_sidecar: Arc::new(sidecar), + }; + crate::publish_pubsub_message( + network_tx, + PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()), + )?; + BlockWrapper::BlockAndBlob(block_and_blobs) } else { - crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; - block.into() - }; + //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required + return Err(warp_utils::reject::broadcast_without_import(format!( + "no blob cached for block" + ))); + } + } else { + crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; + BlockWrapper::Block(block) + }; // Determine the delay after the start of the slot, register it with metrics. let block = wrapped_block.block(); diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index eb5cc7f27..df5350d94 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -73,7 +73,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(), - RPCResponse::BlockAndBlobsByRoot(res) => res.as_ssz_bytes(), + RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(), RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => @@ -439,8 +439,7 @@ fn context_bytes( SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()), }; } - if let RPCResponse::BlobsByRange(_) | RPCResponse::BlockAndBlobsByRoot(_) = rpc_variant - { + if let RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) = rpc_variant { return fork_context.to_context_bytes(ForkName::Eip4844); } } @@ -586,7 +585,7 @@ fn handle_v1_response( )))), _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, - "Invalid fork name for blobs by range".to_string(), + "Invalid forkname for blobsbyrange".to_string(), )), } } @@ -598,12 +597,12 @@ fn handle_v1_response( ) })?; match fork_name { - ForkName::Eip4844 => Ok(Some(RPCResponse::BlockAndBlobsByRoot( + ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRoot(Arc::new( SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?, - ))), + )))), _ => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, - "Invalid fork name for block and blobs by root".to_string(), + "Invalid forkname for blobsbyroot".to_string(), )), } } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 02e24d8e1..53e6b6759 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -281,7 +281,7 @@ pub enum RPCResponse { LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. - BlockAndBlobsByRoot(SignedBeaconBlockAndBlobsSidecar), + BlobsByRoot(Arc>), /// A PONG response to a PING request. Pong(Ping), @@ -372,7 +372,7 @@ impl RPCCodedResponse { RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, RPCResponse::BlobsByRange(_) => true, - RPCResponse::BlockAndBlobsByRoot(_) => true, + RPCResponse::BlobsByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, RPCResponse::LightClientBootstrap(_) => false, @@ -409,7 +409,7 @@ impl RPCResponse { RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange, RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange, - RPCResponse::BlockAndBlobsByRoot(_) => Protocol::BlobsByRoot, + RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::MetaData(_) => Protocol::MetaData, RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, @@ -449,7 +449,7 @@ impl std::fmt::Display for RPCResponse { RPCResponse::BlobsByRange(blob) => { write!(f, "BlobsByRange: Blob slot: {}", blob.beacon_block_slot) } - RPCResponse::BlockAndBlobsByRoot(blob) => { + RPCResponse::BlobsByRoot(blob) => { write!( f, "BlobsByRoot: Blob slot: {}", diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index c9c239d8c..b6a033020 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -83,7 +83,7 @@ pub enum Response { /// A response to a LightClientUpdate request. LightClientBootstrap(LightClientBootstrap), /// A response to a get BLOBS_BY_ROOT request. - BlobsByRoot(Option>), + BlobsByRoot(Option>>), } impl std::convert::From> for RPCCodedResponse { @@ -98,7 +98,7 @@ impl std::convert::From> for RPCCodedResponse RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), }, Response::BlobsByRoot(r) => match r { - Some(b) => RPCCodedResponse::Success(RPCResponse::BlockAndBlobsByRoot(b)), + Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRoot(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRoot), }, Response::BlobsByRange(r) => match r { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index d59bc4bfd..9adf7699b 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1315,7 +1315,7 @@ impl Network { RPCResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } - RPCResponse::BlockAndBlobsByRoot(resp) => { + RPCResponse::BlobsByRoot(resp) => { self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) } // Should never be reached diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 37d6edef8..9de006c84 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1699,7 +1699,7 @@ impl BeaconProcessor { message_id, peer_id, peer_client, - block.into(), + BlockWrapper::Block(block), work_reprocessing_tx, duplicate_cache, seen_timestamp, @@ -1721,7 +1721,7 @@ impl BeaconProcessor { message_id, peer_id, peer_client, - block_sidecar_pair.into(), + BlockWrapper::BlockAndBlob(block_sidecar_pair), work_reprocessing_tx, duplicate_cache, seen_timestamp, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 69bd7da11..3ade1bb87 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -230,10 +230,10 @@ impl Worker { Ok((Some(block), Some(blobs))) => { self.send_response( peer_id, - Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar { + Response::BlobsByRoot(Some(Arc::new(SignedBeaconBlockAndBlobsSidecar { beacon_block: block, blobs_sidecar: blobs, - })), + }))), request_id, ); send_block_count += 1; diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index b3465c56d..5af62c37d 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -188,7 +188,14 @@ impl Worker { let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); - let unwrapped = downloaded_blocks.into_iter().map(|_| todo!()).collect(); + let unwrapped = downloaded_blocks + .into_iter() + .map(|block| match block { + BlockWrapper::Block(block) => block, + //FIXME(sean) handle blobs in backfill + BlockWrapper::BlockAndBlob(_) => todo!(), + }) + .collect(); match self.process_backfill_blocks(unwrapped) { (_, Ok(_)) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index d0879baba..5ee0e367b 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -223,10 +223,10 @@ impl Processor { SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { unreachable!("Block lookups do not request BBRange requests") } - id @ (SyncId::BackFillBlocks { .. } - | SyncId::RangeBlocks { .. } - | SyncId::BackFillBlobs { .. } - | SyncId::RangeBlobs { .. }) => id, + id @ (SyncId::BackFillSync { .. } + | SyncId::RangeSync { .. } + | SyncId::BackFillSidecarPair { .. } + | SyncId::RangeSidecarPair { .. }) => id, }, RequestId::Router => unreachable!("All BBRange requests belong to sync"), }; @@ -258,7 +258,7 @@ impl Processor { ); if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::RpcBlobs { + self.send_to_sync(SyncMessage::RpcGlob { peer_id, request_id: id, blob_sidecar, @@ -282,10 +282,10 @@ impl Processor { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, - SyncId::BackFillBlocks { .. } - | SyncId::RangeBlocks { .. } - | SyncId::RangeBlobs { .. } - | SyncId::BackFillBlobs { .. } => { + SyncId::BackFillSync { .. } + | SyncId::RangeSync { .. } + | SyncId::RangeSidecarPair { .. } + | SyncId::BackFillSidecarPair { .. } => { unreachable!("Batch syncing do not request BBRoot requests") } }, @@ -310,15 +310,15 @@ impl Processor { &mut self, peer_id: PeerId, request_id: RequestId, - block_and_blobs: Option>, + block_and_blobs: Option>>, ) { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, - SyncId::BackFillBlocks { .. } - | SyncId::RangeBlocks { .. } - | SyncId::RangeBlobs { .. } - | SyncId::BackFillBlobs { .. } => { + SyncId::BackFillSync { .. } + | SyncId::RangeSync { .. } + | SyncId::RangeSidecarPair { .. } + | SyncId::BackFillSidecarPair { .. } => { unreachable!("Batch syncing does not request BBRoot requests") } }, @@ -330,7 +330,7 @@ impl Processor { "Received BlockAndBlobssByRoot Response"; "peer" => %peer_id, ); - self.send_to_sync(SyncMessage::RpcBlockAndBlobs { + self.send_to_sync(SyncMessage::RpcBlockAndGlob { peer_id, request_id, block_and_blobs, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index ad1bfb1d4..56ed55153 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -536,7 +536,7 @@ impl BackFillSync { let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); self.current_processing_batch = Some(batch_id); - let work_event = BeaconWorkEvent::chain_segment(process_id, blocks); + let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks()); if let Err(e) = network.processor_channel().try_send(work_event) { crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch", "error" => %e, "batch" => self.processing_target); diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 46ac5bd0f..f82417db3 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,9 +1,12 @@ use std::{collections::VecDeque, sync::Arc}; -use types::{signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock}; +use types::{ + signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock, + SignedBeaconBlockAndBlobsSidecar, +}; #[derive(Debug, Default)] -pub struct BlocksAndBlobsRequestInfo { +pub struct BlockBlobRequestInfo { /// Blocks we have received awaiting for their corresponding sidecar. accumulated_blocks: VecDeque>>, /// Sidecars we have received awaiting for their corresponding block. @@ -14,7 +17,7 @@ pub struct BlocksAndBlobsRequestInfo { is_sidecars_stream_terminated: bool, } -impl BlocksAndBlobsRequestInfo { +impl BlockBlobRequestInfo { pub fn add_block_response(&mut self, maybe_block: Option>>) { match maybe_block { Some(block) => self.accumulated_blocks.push_back(block), @@ -30,7 +33,7 @@ impl BlocksAndBlobsRequestInfo { } pub fn into_responses(self) -> Result>, &'static str> { - let BlocksAndBlobsRequestInfo { + let BlockBlobRequestInfo { accumulated_blocks, mut accumulated_sidecars, .. @@ -48,9 +51,14 @@ impl BlocksAndBlobsRequestInfo { { let blobs_sidecar = accumulated_sidecars.pop_front().ok_or("missing sidecar")?; - Ok(BlockWrapper::new_with_blobs(beacon_block, blobs_sidecar)) + Ok(BlockWrapper::BlockAndBlob( + SignedBeaconBlockAndBlobsSidecar { + beacon_block, + blobs_sidecar, + }, + )) } else { - Ok(beacon_block.into()) + Ok(BlockWrapper::Block(beacon_block)) } }) .collect::, _>>(); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5da203e0e..29838bde7 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,13 +35,13 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlobs, SyncNetworkContext}; +use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use crate::sync::range_sync::ByRangeRequestType; +use crate::sync::range_sync::ExpectedBatchTy; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; @@ -79,13 +79,13 @@ pub enum RequestId { /// Request searching for a block's parent. The id is the chain ParentLookup { id: Id }, /// Request was from the backfill sync algorithm. - BackFillBlocks { id: Id }, - /// Backfill request for blob sidecars. - BackFillBlobs { id: Id }, + BackFillSync { id: Id }, + /// Backfill request for blocks and sidecars. + BackFillSidecarPair { id: Id }, /// The request was from a chain in the range sync algorithm. - RangeBlocks { id: Id }, - /// The request was from a chain in range, asking for ranges blob sidecars. - RangeBlobs { id: Id }, + RangeSync { id: Id }, + /// The request was from a chain in range, asking for ranges of blocks and sidecars. + RangeSidecarPair { id: Id }, } #[derive(Debug)] @@ -103,7 +103,7 @@ pub enum SyncMessage { }, /// A blob has been received from the RPC. - RpcBlobs { + RpcGlob { request_id: RequestId, peer_id: PeerId, blob_sidecar: Option>>, @@ -111,10 +111,10 @@ pub enum SyncMessage { }, /// A block and blobs have been received from the RPC. - RpcBlockAndBlobs { + RpcBlockAndGlob { request_id: RequestId, peer_id: PeerId, - block_and_blobs: Option>, + block_and_blobs: Option>>, seen_timestamp: Duration, }, @@ -295,10 +295,10 @@ impl SyncManager { self.block_lookups .parent_lookup_failed(id, peer_id, &mut self.network, error); } - RequestId::BackFillBlocks { id } => { + RequestId::BackFillSync { id } => { if let Some(batch_id) = self .network - .backfill_request_failed(id, ByRangeRequestType::Blocks) + .backfill_request_failed(id, ExpectedBatchTy::OnlyBlock) { match self .backfill_sync @@ -310,10 +310,10 @@ impl SyncManager { } } - RequestId::BackFillBlobs { id } => { + RequestId::BackFillSidecarPair { id } => { if let Some(batch_id) = self .network - .backfill_request_failed(id, ByRangeRequestType::BlocksAndBlobs) + .backfill_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs) { match self .backfill_sync @@ -324,10 +324,10 @@ impl SyncManager { } } } - RequestId::RangeBlocks { id } => { + RequestId::RangeSync { id } => { if let Some((chain_id, batch_id)) = self .network - .range_sync_request_failed(id, ByRangeRequestType::Blocks) + .range_sync_request_failed(id, ExpectedBatchTy::OnlyBlock) { self.range_sync.inject_error( &mut self.network, @@ -339,10 +339,10 @@ impl SyncManager { self.update_sync_state() } } - RequestId::RangeBlobs { id } => { + RequestId::RangeSidecarPair { id } => { if let Some((chain_id, batch_id)) = self .network - .range_sync_request_failed(id, ByRangeRequestType::BlocksAndBlobs) + .range_sync_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs) { self.range_sync.inject_error( &mut self.network, @@ -648,18 +648,18 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &mut self.network), }, - SyncMessage::RpcBlobs { + SyncMessage::RpcGlob { request_id, peer_id, blob_sidecar, seen_timestamp, - } => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp), - SyncMessage::RpcBlockAndBlobs { + } => self.rpc_sidecar_received(request_id, peer_id, blob_sidecar, seen_timestamp), + SyncMessage::RpcBlockAndGlob { request_id, peer_id, block_and_blobs, seen_timestamp, - } => self.rpc_block_block_and_blobs_received( + } => self.rpc_block_sidecar_pair_received( request_id, peer_id, block_and_blobs, @@ -734,18 +734,18 @@ impl SyncManager { RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( id, peer_id, - beacon_block.map(|block| block.into()), + beacon_block.map(|block| BlockWrapper::Block(block)), seen_timestamp, &mut self.network, ), RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( id, peer_id, - beacon_block.map(|block| block.into()), + beacon_block.map(|block| BlockWrapper::Block(block)), seen_timestamp, &mut self.network, ), - RequestId::BackFillBlocks { id } => { + RequestId::BackFillSync { id } => { let is_stream_terminator = beacon_block.is_none(); if let Some(batch_id) = self .network @@ -756,7 +756,7 @@ impl SyncManager { batch_id, &peer_id, id, - beacon_block.map(|block| block.into()), + beacon_block.map(|block| BlockWrapper::Block(block)), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -768,7 +768,7 @@ impl SyncManager { } } } - RequestId::RangeBlocks { id } => { + RequestId::RangeSync { id } => { let is_stream_terminator = beacon_block.is_none(); if let Some((chain_id, batch_id)) = self .network @@ -780,28 +780,28 @@ impl SyncManager { chain_id, batch_id, id, - beacon_block.map(|block| block.into()), + beacon_block.map(|block| BlockWrapper::Block(block)), ); self.update_sync_state(); } } - RequestId::BackFillBlobs { id } => { - self.blobs_backfill_response(id, peer_id, beacon_block.into()) + RequestId::BackFillSidecarPair { id } => { + self.block_blob_backfill_response(id, peer_id, beacon_block.into()) } - RequestId::RangeBlobs { id } => { - self.blobs_range_response(id, peer_id, beacon_block.into()) + RequestId::RangeSidecarPair { id } => { + self.block_blob_range_response(id, peer_id, beacon_block.into()) } } } /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. - fn blobs_range_response( + fn block_blob_range_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) { if let Some((chain_id, batch_id, block_responses)) = self .network @@ -834,7 +834,7 @@ impl SyncManager { "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy - let id = RequestId::RangeBlobs { id }; + let id = RequestId::RangeSidecarPair { id }; self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } @@ -843,11 +843,11 @@ impl SyncManager { /// Handles receiving a response for a Backfill sync request that should have both blocks and /// blobs. - fn blobs_backfill_response( + fn block_blob_backfill_response( &mut self, id: Id, peer_id: PeerId, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) { if let Some((batch_id, block_responses)) = self .network @@ -886,14 +886,14 @@ impl SyncManager { "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy - let id = RequestId::BackFillBlobs { id }; + let id = RequestId::BackFillSidecarPair { id }; self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } } } - fn rpc_blobs_received( + fn rpc_sidecar_received( &mut self, request_id: RequestId, peer_id: PeerId, @@ -904,47 +904,57 @@ impl SyncManager { RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => { unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block") } - RequestId::BackFillBlocks { .. } => { + RequestId::BackFillSync { .. } => { unreachable!("An only blocks request does not receive sidecars") } - RequestId::BackFillBlobs { id } => { - self.blobs_backfill_response(id, peer_id, maybe_sidecar.into()) + RequestId::BackFillSidecarPair { id } => { + self.block_blob_backfill_response(id, peer_id, maybe_sidecar.into()) } - RequestId::RangeBlocks { .. } => { + RequestId::RangeSync { .. } => { unreachable!("Only-blocks range requests don't receive sidecars") } - RequestId::RangeBlobs { id } => { - self.blobs_range_response(id, peer_id, maybe_sidecar.into()) + RequestId::RangeSidecarPair { id } => { + self.block_blob_range_response(id, peer_id, maybe_sidecar.into()) } } } - fn rpc_block_block_and_blobs_received( + fn rpc_block_sidecar_pair_received( &mut self, request_id: RequestId, peer_id: PeerId, - block_sidecar_pair: Option>, + block_sidecar_pair: Option>>, seen_timestamp: Duration, ) { match request_id { RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( id, peer_id, - block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()), + block_sidecar_pair.map(|block_sidecar_pair| { + BlockWrapper::BlockAndBlob( + // TODO: why is this in an arc + (*block_sidecar_pair).clone(), + ) + }), seen_timestamp, &mut self.network, ), RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( id, peer_id, - block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()), + block_sidecar_pair.map(|block_sidecar_pair| { + BlockWrapper::BlockAndBlob( + // TODO: why is this in an arc + (*block_sidecar_pair).clone(), + ) + }), seen_timestamp, &mut self.network, ), - RequestId::BackFillBlocks { .. } - | RequestId::BackFillBlobs { .. } - | RequestId::RangeBlocks { .. } - | RequestId::RangeBlobs { .. } => unreachable!( + RequestId::BackFillSync { .. } + | RequestId::BackFillSidecarPair { .. } + | RequestId::RangeSync { .. } + | RequestId::RangeSidecarPair { .. } => unreachable!( "since range requests are not block-glob coupled, this should never be reachable" ), } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index c54b3b1a9..36da3bf82 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,9 +1,9 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; +use super::block_sidecar_coupling::BlockBlobRequestInfo; use super::manager::{Id, RequestId as SyncRequestId}; -use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; +use super::range_sync::{BatchId, ChainId, ExpectedBatchTy}; use crate::beacon_processor::WorkEvent; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; @@ -38,12 +38,11 @@ pub struct SyncNetworkContext { backfill_requests: FnvHashMap, /// BlocksByRange requests paired with BlobsByRange requests made by the range. - range_blocks_and_blobs_requests: - FnvHashMap)>, + range_sidecar_pair_requests: + FnvHashMap)>, /// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync. - backfill_blocks_and_blobs_requests: - FnvHashMap)>, + backfill_sidecar_pair_requests: FnvHashMap)>, /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. @@ -59,20 +58,20 @@ pub struct SyncNetworkContext { } /// Small enumeration to make dealing with block and blob requests easier. -pub enum BlockOrBlobs { +pub enum BlockOrBlob { Block(Option>>), - Blobs(Option>>), + Blob(Option>>), } -impl From>>> for BlockOrBlobs { +impl From>>> for BlockOrBlob { fn from(block: Option>>) -> Self { - BlockOrBlobs::Block(block) + BlockOrBlob::Block(block) } } -impl From>>> for BlockOrBlobs { +impl From>>> for BlockOrBlob { fn from(blob: Option>>) -> Self { - BlockOrBlobs::Blobs(blob) + BlockOrBlob::Blob(blob) } } @@ -90,8 +89,8 @@ impl SyncNetworkContext { request_id: 1, range_requests: Default::default(), backfill_requests: Default::default(), - range_blocks_and_blobs_requests: Default::default(), - backfill_blocks_and_blobs_requests: Default::default(), + range_sidecar_pair_requests: Default::default(), + backfill_sidecar_pair_requests: Default::default(), execution_engine_state: EngineState::Online, // always assume `Online` at the start beacon_processor_send, chain, @@ -141,13 +140,13 @@ impl SyncNetworkContext { pub fn blocks_by_range_request( &mut self, peer_id: PeerId, - batch_type: ByRangeRequestType, + batch_type: ExpectedBatchTy, request: BlocksByRangeRequest, chain_id: ChainId, batch_id: BatchId, ) -> Result { match batch_type { - ByRangeRequestType::Blocks => { + ExpectedBatchTy::OnlyBlock => { trace!( self.log, "Sending BlocksByRange request"; @@ -157,7 +156,7 @@ impl SyncNetworkContext { ); let request = Request::BlocksByRange(request); let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeBlocks { id }); + let request_id = RequestId::Sync(SyncRequestId::RangeSync { id }); self.send_network_msg(NetworkMessage::SendRequest { peer_id, request, @@ -166,7 +165,7 @@ impl SyncNetworkContext { self.range_requests.insert(id, (chain_id, batch_id)); Ok(id) } - ByRangeRequestType::BlocksAndBlobs => { + ExpectedBatchTy::OnlyBlockBlobs => { debug!( self.log, "Sending BlocksByRange and BlobsByRange requests"; @@ -177,7 +176,7 @@ impl SyncNetworkContext { // create the shared request id. This is fine since the rpc handles substream ids. let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeBlobs { id }); + let request_id = RequestId::Sync(SyncRequestId::RangeSidecarPair { id }); // Create the blob request based on the blob request. let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { @@ -197,8 +196,8 @@ impl SyncNetworkContext { request: blobs_request, request_id, })?; - let block_blob_info = BlocksAndBlobsRequestInfo::default(); - self.range_blocks_and_blobs_requests + let block_blob_info = BlockBlobRequestInfo::default(); + self.range_sidecar_pair_requests .insert(id, (chain_id, batch_id, block_blob_info)); Ok(id) } @@ -209,12 +208,12 @@ impl SyncNetworkContext { pub fn backfill_blocks_by_range_request( &mut self, peer_id: PeerId, - batch_type: ByRangeRequestType, + batch_type: ExpectedBatchTy, request: BlocksByRangeRequest, batch_id: BatchId, ) -> Result { match batch_type { - ByRangeRequestType::Blocks => { + ExpectedBatchTy::OnlyBlock => { trace!( self.log, "Sending backfill BlocksByRange request"; @@ -224,7 +223,7 @@ impl SyncNetworkContext { ); let request = Request::BlocksByRange(request); let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::BackFillBlocks { id }); + let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id }); self.send_network_msg(NetworkMessage::SendRequest { peer_id, request, @@ -233,7 +232,7 @@ impl SyncNetworkContext { self.backfill_requests.insert(id, batch_id); Ok(id) } - ByRangeRequestType::BlocksAndBlobs => { + ExpectedBatchTy::OnlyBlockBlobs => { debug!( self.log, "Sending backfill BlocksByRange and BlobsByRange requests"; @@ -244,7 +243,7 @@ impl SyncNetworkContext { // create the shared request id. This is fine since the rpc handles substream ids. let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::BackFillBlobs { id }); + let request_id = RequestId::Sync(SyncRequestId::BackFillSidecarPair { id }); // Create the blob request based on the blob request. let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { @@ -264,8 +263,8 @@ impl SyncNetworkContext { request: blobs_request, request_id, })?; - let block_blob_info = BlocksAndBlobsRequestInfo::default(); - self.backfill_blocks_and_blobs_requests + let block_blob_info = BlockBlobRequestInfo::default(); + self.backfill_sidecar_pair_requests .insert(id, (batch_id, block_blob_info)); Ok(id) } @@ -289,18 +288,18 @@ impl SyncNetworkContext { pub fn range_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) -> Option<( ChainId, BatchId, Result>, &'static str>, )> { - match self.range_blocks_and_blobs_requests.entry(request_id) { + match self.range_sidecar_pair_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, _, info) = entry.get_mut(); match block_or_blob { - BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything @@ -317,28 +316,28 @@ impl SyncNetworkContext { pub fn range_sync_request_failed( &mut self, request_id: Id, - batch_type: ByRangeRequestType, + batch_type: ExpectedBatchTy, ) -> Option<(ChainId, BatchId)> { match batch_type { - ByRangeRequestType::BlocksAndBlobs => self - .range_blocks_and_blobs_requests + ExpectedBatchTy::OnlyBlockBlobs => self + .range_sidecar_pair_requests .remove(&request_id) .map(|(chain_id, batch_id, _info)| (chain_id, batch_id)), - ByRangeRequestType::Blocks => self.range_requests.remove(&request_id), + ExpectedBatchTy::OnlyBlock => self.range_requests.remove(&request_id), } } pub fn backfill_request_failed( &mut self, request_id: Id, - batch_type: ByRangeRequestType, + batch_type: ExpectedBatchTy, ) -> Option { match batch_type { - ByRangeRequestType::BlocksAndBlobs => self - .backfill_blocks_and_blobs_requests + ExpectedBatchTy::OnlyBlockBlobs => self + .backfill_sidecar_pair_requests .remove(&request_id) .map(|(batch_id, _info)| batch_id), - ByRangeRequestType::Blocks => self.backfill_requests.remove(&request_id), + ExpectedBatchTy::OnlyBlock => self.backfill_requests.remove(&request_id), } } @@ -361,14 +360,14 @@ impl SyncNetworkContext { pub fn backfill_sync_block_and_blob_response( &mut self, request_id: Id, - block_or_blob: BlockOrBlobs, + block_or_blob: BlockOrBlob, ) -> Option<(BatchId, Result>, &'static str>)> { - match self.backfill_blocks_and_blobs_requests.entry(request_id) { + match self.backfill_sidecar_pair_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); match block_or_blob { - BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything @@ -534,7 +533,7 @@ impl SyncNetworkContext { /// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// blocks and blobs. - pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType { + pub fn batch_type(&self, epoch: types::Epoch) -> ExpectedBatchTy { const _: () = assert!( super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1 && super::range_sync::EPOCHS_PER_BATCH == 1, @@ -543,18 +542,18 @@ impl SyncNetworkContext { #[cfg(test)] { // Keep tests only for blocks. - return ByRangeRequestType::Blocks; + return ExpectedBatchTy::OnlyBlock; } #[cfg(not(test))] { if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if epoch >= data_availability_boundary { - ByRangeRequestType::BlocksAndBlobs + ExpectedBatchTy::OnlyBlockBlobs } else { - ByRangeRequestType::Blocks + ExpectedBatchTy::OnlyBlock } } else { - ByRangeRequestType::Blocks + ExpectedBatchTy::OnlyBlock } } } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 184dcffc4..80f34f8b4 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -4,9 +4,10 @@ use lighthouse_network::PeerId; use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::ops::Sub; +use std::sync::Arc; use strum::Display; use types::signed_block_and_blobs::BlockWrapper; -use types::{Epoch, EthSpec, Slot}; +use types::{Epoch, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot}; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; @@ -15,12 +16,36 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; +pub enum BatchTy { + Blocks(Vec>>), + BlocksAndBlobs(Vec>), +} + +impl BatchTy { + pub fn into_wrapped_blocks(self) -> Vec> { + match self { + BatchTy::Blocks(blocks) => blocks + .into_iter() + .map(|block| BlockWrapper::Block(block)) + .collect(), + BatchTy::BlocksAndBlobs(block_sidecar_pair) => block_sidecar_pair + .into_iter() + .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob(block_sidecar_pair)) + .collect(), + } + } +} + +/// Error representing a batch with mixed block types. +#[derive(Debug)] +pub struct MixedBlockTyErr; + /// Type of expected batch. #[derive(Debug, Copy, Clone, Display)] #[strum(serialize_all = "snake_case")] -pub enum ByRangeRequestType { - BlocksAndBlobs, - Blocks, +pub enum ExpectedBatchTy { + OnlyBlockBlobs, + OnlyBlock, } /// Allows customisation of the above constants used in other sync methods such as BackFillSync. @@ -106,7 +131,7 @@ pub struct BatchInfo { /// State of the batch. state: BatchState, /// Whether this batch contains all blocks or all blocks and blobs. - batch_type: ByRangeRequestType, + batch_type: ExpectedBatchTy, /// Pin the generic marker: std::marker::PhantomData, } @@ -155,7 +180,7 @@ impl BatchInfo { /// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to /// deal with this for now. /// This means finalization might be slower in eip4844 - pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ByRangeRequestType) -> Self { + pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ExpectedBatchTy) -> Self { let start_slot = start_epoch.start_slot(T::slots_per_epoch()); let end_slot = start_slot + num_of_epochs * T::slots_per_epoch(); BatchInfo { @@ -218,7 +243,7 @@ impl BatchInfo { } /// Returns a BlocksByRange request associated with the batch. - pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { + pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ExpectedBatchTy) { ( BlocksByRangeRequest { start_slot: self.start_slot.into(), @@ -383,11 +408,30 @@ impl BatchInfo { } } - pub fn start_processing(&mut self) -> Result>, WrongState> { + pub fn start_processing(&mut self) -> Result, WrongState> { match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks) => { self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); - Ok(blocks) + match self.batch_type { + ExpectedBatchTy::OnlyBlockBlobs => { + let blocks = blocks.into_iter().map(|block| { + let BlockWrapper::BlockAndBlob(block_and_blob) = block else { + panic!("Batches should never have a mixed type. This is a bug. Contact D") + }; + block_and_blob + }).collect(); + Ok(BatchTy::BlocksAndBlobs(blocks)) + } + ExpectedBatchTy::OnlyBlock => { + let blocks = blocks.into_iter().map(|block| { + let BlockWrapper::Block(block) = block else { + panic!("Batches should never have a mixed type. This is a bug. Contact D") + }; + block + }).collect(); + Ok(BatchTy::Blocks(blocks)) + } + } } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index d60de3224..89e120050 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -332,7 +332,7 @@ impl SyncingChain { let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized); self.current_processing_batch = Some(batch_id); - let work_event = BeaconWorkEvent::chain_segment(process_id, blocks); + let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks()); if let Err(e) = beacon_processor_send.try_send(work_event) { crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index d0f2f9217..284260321 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -9,8 +9,8 @@ mod range; mod sync_type; pub use batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, - ByRangeRequestType, + BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchTy, + ExpectedBatchTy, }; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 09d93b0e8..1e3474fa5 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -373,7 +373,7 @@ where #[cfg(test)] mod tests { use crate::service::RequestId; - use crate::sync::range_sync::ByRangeRequestType; + use crate::sync::range_sync::ExpectedBatchTy; use crate::NetworkMessage; use super::*; @@ -686,7 +686,7 @@ mod tests { let (peer1, local_info, head_info) = rig.head_peer(); range.add_peer(&mut rig.cx, local_info, peer1, head_info); let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { + RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { (rig.cx.range_sync_response(id, true).unwrap(), id) } other => panic!("unexpected request {:?}", other), @@ -705,7 +705,7 @@ mod tests { let (peer2, local_info, finalized_info) = rig.finalized_peer(); range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { + RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { (rig.cx.range_sync_response(id, true).unwrap(), id) } other => panic!("unexpected request {:?}", other), diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index c589fbcfe..f21545f27 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -34,56 +34,33 @@ impl SignedBeaconBlockAndBlobsSidecar { } } -/// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. This newtype -/// wraps the `BlockWrapperInner` to ensure blobs cannot be accessed via an enum match. This would -/// circumvent empty blob reconstruction when accessing blobs. -#[derive(Clone, Debug, Derivative)] -#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] -pub struct BlockWrapper(BlockWrapperInner); - /// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. #[derive(Clone, Debug, Derivative)] #[derivative(PartialEq, Hash(bound = "T: EthSpec"))] -pub enum BlockWrapperInner { +pub enum BlockWrapper { Block(Arc>), BlockAndBlob(SignedBeaconBlockAndBlobsSidecar), } impl BlockWrapper { - pub fn new(block: Arc>) -> Self { - Self(BlockWrapperInner::Block(block)) - } - - pub fn new_with_blobs( - beacon_block: Arc>, - blobs_sidecar: Arc>, - ) -> Self { - Self(BlockWrapperInner::BlockAndBlob( - SignedBeaconBlockAndBlobsSidecar { - beacon_block, - blobs_sidecar, - }, - )) - } - pub fn slot(&self) -> Slot { - match &self.0 { - BlockWrapperInner::Block(block) => block.slot(), - BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { + match self { + BlockWrapper::Block(block) => block.slot(), + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.slot() } } } pub fn block(&self) -> &SignedBeaconBlock { - match &self.0 { - BlockWrapperInner::Block(block) => &block, - BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => &block_sidecar_pair.beacon_block, + match self { + BlockWrapper::Block(block) => &block, + BlockWrapper::BlockAndBlob(block_sidecar_pair) => &block_sidecar_pair.beacon_block, } } pub fn block_cloned(&self) -> Arc> { - match &self.0 { - BlockWrapperInner::Block(block) => block.clone(), - BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { + match self { + BlockWrapper::Block(block) => block.clone(), + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.clone() } } @@ -93,20 +70,20 @@ impl BlockWrapper { &self, block_root: Option, ) -> Result>>, BlobReconstructionError> { - match &self.0 { - BlockWrapperInner::Block(block) => block + match self { + BlockWrapper::Block(block) => block .reconstruct_empty_blobs(block_root) .map(|blob_opt| blob_opt.map(Arc::new)), - BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { Ok(Some(block_sidecar_pair.blobs_sidecar.clone())) } } } pub fn message(&self) -> crate::BeaconBlockRef { - match &self.0 { - BlockWrapperInner::Block(block) => block.message(), - BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { + match self { + BlockWrapper::Block(block) => block.message(), + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { block_sidecar_pair.beacon_block.message() } } @@ -123,14 +100,14 @@ impl BlockWrapper { Arc>, Result>>, BlobReconstructionError>, ) { - match self.0 { - BlockWrapperInner::Block(block) => { + match self { + BlockWrapper::Block(block) => { let blobs = block .reconstruct_empty_blobs(block_root) .map(|blob_opt| blob_opt.map(Arc::new)); (block, blobs) } - BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { + BlockWrapper::BlockAndBlob(block_sidecar_pair) => { let SignedBeaconBlockAndBlobsSidecar { beacon_block, blobs_sidecar, @@ -143,18 +120,12 @@ impl BlockWrapper { impl From> for BlockWrapper { fn from(block: SignedBeaconBlock) -> Self { - BlockWrapper(BlockWrapperInner::Block(Arc::new(block))) + BlockWrapper::Block(Arc::new(block)) } } impl From>> for BlockWrapper { fn from(block: Arc>) -> Self { - BlockWrapper(BlockWrapperInner::Block(block)) - } -} - -impl From> for BlockWrapper { - fn from(block: SignedBeaconBlockAndBlobsSidecar) -> Self { - BlockWrapper(BlockWrapperInner::BlockAndBlob(block)) + BlockWrapper::Block(block) } }