From 09370e70d929358b22b8a86300795d8faa184b4d Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 26 Jan 2023 20:18:59 +0100 Subject: [PATCH] Fix rebase conflicts --- .../beacon_chain/src/attester_cache.rs | 9 +- beacon_node/beacon_chain/src/beacon_chain.rs | 97 ++++++++++--------- .../beacon_chain/src/early_attester_cache.rs | 3 +- beacon_node/beacon_chain/src/kzg_utils.rs | 2 +- .../src/engine_api/json_structures.rs | 11 +-- beacon_node/execution_layer/src/lib.rs | 49 +++++++--- beacon_node/http_api/src/publish_blocks.rs | 6 +- .../lighthouse_network/src/rpc/protocol.rs | 16 +-- .../beacon_processor/worker/gossip_methods.rs | 2 +- .../beacon_processor/worker/sync_methods.rs | 5 +- beacon_node/network/src/sync/manager.rs | 20 ++-- .../network/src/sync/network_context.rs | 78 ++++++++++----- .../network/src/sync/range_sync/chain.rs | 1 + .../per_block_processing/eip4844/eip4844.rs | 2 +- consensus/types/src/beacon_block_body.rs | 4 +- consensus/types/src/blobs_sidecar.rs | 7 +- consensus/types/src/lib.rs | 4 +- crypto/kzg/src/kzg_commitment.rs | 2 +- crypto/kzg/src/kzg_proof.rs | 2 +- crypto/kzg/src/lib.rs | 2 +- lcli/src/new_testnet.rs | 2 +- 21 files changed, 196 insertions(+), 128 deletions(-) diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 8e2dfdab8..8421b0a5c 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -14,6 +14,7 @@ use parking_lot::RwLock; use state_processing::state_advance::{partial_state_advance, Error as StateAdvanceError}; use std::collections::HashMap; use std::ops::Range; +use store::signed_beacon_block::BlobReconstructionError; use types::{ beacon_state::{ compute_committee_index_in_epoch, compute_committee_range_in_epoch, epoch_committee_count, @@ -42,7 +43,7 @@ pub enum Error { // Boxed to avoid an infinite-size recursion issue. BeaconChain(Box), MissingBeaconState(Hash256), - MissingBlobs, + MissingBlobs(BlobReconstructionError), FailedToTransitionState(StateAdvanceError), CannotAttestToFutureState { state_slot: Slot, @@ -74,6 +75,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: BlobReconstructionError) -> Self { + Error::MissingBlobs(e) + } +} + /// Stores the minimal amount of data required to compute the committee length for any committee at any /// slot in a given `epoch`. pub struct CommitteeLengths { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 741d9a95b..bddca6ebb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1075,27 +1075,23 @@ impl BeaconChain { match self.store.get_blobs(block_root)? { Some(blobs) => Ok(Some(blobs)), None => { - // Check for the corresponding block to understand whether we *should* have blobs. - self.get_blinded_block(block_root)? - .map(|block| { - // If there are no KZG commitments in the block, we know the sidecar should - // be empty. - let expected_kzg_commitments = - match block.message().body().blob_kzg_commitments() { - Ok(kzg_commitments) => kzg_commitments, - Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock), - }; - if expected_kzg_commitments.is_empty() { - Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot())) - } else if data_availability_boundary <= block.epoch() { - // We should have blobs for all blocks younger than the boundary. - Err(Error::BlobsUnavailable) - } else { - // We shouldn't have blobs for blocks older than the boundary. - Err(Error::BlobsOlderThanDataAvailabilityBoundary(block.epoch())) - } - }) - .transpose() + if let Ok(Some(block)) = self.get_blinded_block(block_root) { + let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; + + if !expected_kzg_commitments.is_empty() { + Err(Error::DBInconsistent(format!( + "Expected kzg commitments but no blobs stored for block root {}", + block_root + ))) + } else { + Ok(Some(BlobsSidecar::empty_from_parts( + *block_root, + block.slot(), + ))) + } + } else { + Ok(None) + } } } } @@ -3031,7 +3027,7 @@ impl BeaconChain { // margin, or younger (of higher epoch number). if block_epoch >= import_boundary { if let Some(blobs) = blobs { - if blobs.blobs.len() > 0 { + if !blobs.blobs.is_empty() { //FIXME(sean) using this for debugging for now info!( self.log, "Writing blobs to store"; @@ -4548,7 +4544,7 @@ impl BeaconChain { None, ), BeaconState::Merge(_) => { - let (payload, _, _) = block_contents + let block_contents = block_contents .ok_or(BlockProductionError::MissingExecutionPayload)? .deconstruct(); ( @@ -4568,7 +4564,8 @@ impl BeaconChain { voluntary_exits: voluntary_exits.into(), sync_aggregate: sync_aggregate .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: payload + execution_payload: block_contents + .payload .try_into() .map_err(|_| BlockProductionError::InvalidPayloadFork)?, }, @@ -4577,7 +4574,7 @@ impl BeaconChain { ) } BeaconState::Capella(_) => { - let (payload, _, _) = block_contents + let block_contents = block_contents .ok_or(BlockProductionError::MissingExecutionPayload)? .deconstruct(); @@ -4598,7 +4595,8 @@ impl BeaconChain { voluntary_exits: voluntary_exits.into(), sync_aggregate: sync_aggregate .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: payload + execution_payload: block_contents + .payload .try_into() .map_err(|_| BlockProductionError::InvalidPayloadFork)?, bls_to_execution_changes: bls_to_execution_changes.into(), @@ -4608,10 +4606,22 @@ impl BeaconChain { ) } BeaconState::Eip4844(_) => { - let (payload, kzg_commitments, blobs) = block_contents + let block_contents_unpacked = block_contents .ok_or(BlockProductionError::MissingExecutionPayload)? .deconstruct(); + let (blob_kzg_commitments, blobs) = match block_contents_unpacked.blobs_content { + Some(blobs_content) => { + let kzg_commitments: KzgCommitments = + blobs_content.kzg_commitments; + let blobs: Blobs = blobs_content.blobs; + (kzg_commitments, blobs) + } + None => { + return Err(BlockProductionError::InvalidPayloadFork); + } + }; + ( BeaconBlock::Eip4844(BeaconBlockEip4844 { slot, @@ -4629,15 +4639,15 @@ impl BeaconChain { voluntary_exits: voluntary_exits.into(), sync_aggregate: sync_aggregate .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: payload + execution_payload: block_contents_unpacked + .payload .try_into() .map_err(|_| BlockProductionError::InvalidPayloadFork)?, bls_to_execution_changes: bls_to_execution_changes.into(), - blob_kzg_commitments: kzg_commitments - .ok_or(BlockProductionError::InvalidPayloadFork)?, + blob_kzg_commitments, }, }), - blobs, + Some(blobs), ) } }; @@ -4652,7 +4662,7 @@ impl BeaconChain { debug!( self.log, "Produced block on state"; - "block_size" => block_size, + "block_size" => %block_size, ); metrics::observe(&metrics::BLOCK_SIZE, block_size as f64); @@ -4695,8 +4705,8 @@ impl BeaconChain { .as_ref() .ok_or(BlockProductionError::TrustedSetupNotInitialized)?; let kzg_aggregated_proof = - kzg_utils::compute_aggregate_kzg_proof::(&kzg, &blobs) - .map_err(|e| BlockProductionError::KzgError(e))?; + kzg_utils::compute_aggregate_kzg_proof::(kzg, &blobs) + .map_err(BlockProductionError::KzgError)?; let beacon_block_root = block.canonical_root(); let expected_kzg_commitments = block.body().blob_kzg_commitments().map_err(|_| { BlockProductionError::InvalidBlockVariant( @@ -4710,7 +4720,7 @@ impl BeaconChain { kzg_aggregated_proof, }; kzg_utils::validate_blobs_sidecar( - &kzg, + kzg, slot, beacon_block_root, expected_kzg_commitments, @@ -5942,17 +5952,14 @@ impl BeaconChain { /// The epoch at which we require a data availability check in block processing. /// `None` if the `Eip4844` fork is disabled. pub fn data_availability_boundary(&self) -> Option { - self.spec - .eip4844_fork_epoch - .map(|fork_epoch| { - self.epoch().ok().map(|current_epoch| { - std::cmp::max( - fork_epoch, - current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), - ) - }) + self.spec.eip4844_fork_epoch.and_then(|fork_epoch| { + self.epoch().ok().map(|current_epoch| { + std::cmp::max( + fork_epoch, + current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), + ) }) - .flatten() + }) } /// The epoch that is a data availability boundary, or the latest finalized epoch. diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 189935ba4..dd4109da9 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -165,8 +165,7 @@ impl EarlyAttesterCache { .read() .as_ref() .filter(|item| item.beacon_block_root == block_root) - .map(|item| item.blobs.clone()) - .flatten() + .and_then(|item| item.blobs.clone()) } /// Returns the proto-array block, if `block_root` matches the cached item. diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index 8589a6fe4..4ee8c3506 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -40,7 +40,7 @@ pub fn compute_aggregate_kzg_proof( blobs: &[Blob], ) -> Result { let blobs = blobs - .into_iter() + .iter() .map(|blob| ssz_blob_to_crypto_blob::(blob.clone())) // TODO(pawan): avoid this clone .collect::>(); diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index a2297b1f2..3a5f98779 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -2,13 +2,12 @@ use super::*; use serde::{Deserialize, Serialize}; use strum::EnumString; use superstruct::superstruct; +use types::blobs_sidecar::KzgCommitments; use types::{ - Blob, EthSpec, ExecutionBlockHash, FixedVector, KzgCommitment, Transaction, Unsigned, + Blobs, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, + ExecutionPayloadEip4844, ExecutionPayloadMerge, FixedVector, Transaction, Unsigned, VariableList, Withdrawal, }; -use types::{ - ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, -}; #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -418,9 +417,9 @@ impl From for PayloadAttributes { #[serde(bound = "T: EthSpec", rename_all = "camelCase")] pub struct JsonBlobsBundle { pub block_hash: ExecutionBlockHash, - pub kzgs: VariableList, + pub kzgs: KzgCommitments, #[serde(with = "ssz_types::serde_utils::list_of_hex_fixed_vec")] - pub blobs: VariableList, T::MaxBlobsPerBlock>, + pub blobs: Blobs, } #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 3a1365db8..55f432b45 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -38,17 +38,22 @@ use tokio::{ time::sleep, }; use tokio_stream::wrappers::WatchStream; +<<<<<<< HEAD use types::consts::eip4844::BLOB_TX_TYPE; use types::transaction::{AccessTuple, BlobTransaction}; use types::{AbstractExecPayload, BeaconStateError, Blob, ExecPayload, KzgCommitment}; +======= +use types::{ + blobs_sidecar::{Blobs, KzgCommitments}, + ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, +}; +use types::{AbstractExecPayload, BeaconStateError, ExecPayload}; +>>>>>>> d1678db12 (Fix rebase conflicts) use types::{ BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ForkName, ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, Transaction, Uint256, }; -use types::{ - ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, -}; mod block_hash; mod engine_api; @@ -130,31 +135,53 @@ pub enum BlockProposalContents> { }, PayloadAndBlobs { payload: Payload, +<<<<<<< HEAD block_value: Uint256, kzg_commitments: VariableList, blobs: VariableList, T::MaxBlobsPerBlock>, +======= + kzg_commitments: KzgCommitments, + blobs: Blobs, +>>>>>>> d1678db12 (Fix rebase conflicts) }, } +pub struct BlockProposalBlobsContents { + pub kzg_commitments: KzgCommitments, + pub blobs: Blobs, +} + +pub struct BlockProposalContentsDeconstructed> { + pub payload: Payload, + pub blobs_content: Option>, +} + impl> BlockProposalContents { - pub fn deconstruct( - self, - ) -> ( - Payload, - Option>, - Option, T::MaxBlobsPerBlock>>, - ) { + pub fn deconstruct(self) -> BlockProposalContentsDeconstructed { match self { +<<<<<<< HEAD Self::Payload { payload, block_value: _, } => (payload, None, None), +======= + Self::Payload(payload) => BlockProposalContentsDeconstructed { + payload, + blobs_content: None, + }, +>>>>>>> d1678db12 (Fix rebase conflicts) Self::PayloadAndBlobs { payload, block_value: _, kzg_commitments, blobs, - } => (payload, Some(kzg_commitments), Some(blobs)), + } => BlockProposalContentsDeconstructed { + payload, + blobs_content: Some(BlockProposalBlobsContents { + kzg_commitments, + blobs, + }), + }, } } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index a9a0bc9c6..e41a90c95 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -46,9 +46,9 @@ pub async fn publish_block( block_and_blobs.into() } else { //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required - return Err(warp_utils::reject::broadcast_without_import(format!( - "no blob cached for block" - ))); + return Err(warp_utils::reject::broadcast_without_import( + "no blob cached for block".into(), + )); } } else { crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 7f5419d16..799e0d3da 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -409,14 +409,14 @@ impl ProtocolId { /// beginning of the stream, else returns `false`. pub fn has_context_bytes(&self) -> bool { match self.version { - Version::V2 => match self.message_name { - Protocol::BlocksByRange | Protocol::BlocksByRoot => return true, - _ => return false, - }, - Version::V1 => match self.message_name { - Protocol::BlobsByRange | Protocol::BlobsByRoot => return true, - _ => return false, - }, + Version::V2 => matches!( + self.message_name, + Protocol::BlobsByRange | Protocol::BlobsByRoot + ), + Version::V1 => matches!( + self.message_name, + Protocol::BlobsByRange | Protocol::BlobsByRoot + ), } } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index e406d7094..5e84cbb5f 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -726,7 +726,7 @@ impl Worker { let verification_result = self .chain .clone() - .verify_block_for_gossip(block.clone().into()) + .verify_block_for_gossip(block.clone()) .await; let block_root = if let Ok(verified_block) = &verification_result { diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 35ea835eb..8d5bd53ae 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -282,10 +282,7 @@ impl Worker { count_unrealized: CountUnrealized, notify_execution_layer: NotifyExecutionLayer, ) -> (usize, Result<(), ChainSegmentFailed>) { - let blocks: Vec<_> = downloaded_blocks - .cloned() - .map(|block| block.into()) - .collect(); + let blocks: Vec<_> = downloaded_blocks.cloned().collect(); match self .chain .process_chain_segment(blocks, count_unrealized, notify_execution_layer) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 6b3a7b5de..fa171cd04 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -803,15 +803,15 @@ impl SyncManager { peer_id: PeerId, block_or_blob: BlockOrBlobs, ) { - if let Some((chain_id, batch_id, block_responses)) = self + if let Some((chain_id, resp)) = self .network .range_sync_block_and_blob_response(id, block_or_blob) { - match block_responses { + match resp.responses { Ok(blocks) => { for block in blocks .into_iter() - .map(|block| Some(block)) + .map(Some) // chain the stream terminator .chain(vec![None]) { @@ -819,7 +819,7 @@ impl SyncManager { &mut self.network, peer_id, chain_id, - batch_id, + resp.batch_id, id, block, ); @@ -831,7 +831,7 @@ impl SyncManager { // With time we will want to downgrade this log warn!( self.log, "Blocks and blobs request for range received invalid data"; - "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e + "peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy let id = RequestId::RangeBlobs { id }; @@ -849,21 +849,21 @@ impl SyncManager { peer_id: PeerId, block_or_blob: BlockOrBlobs, ) { - if let Some((batch_id, block_responses)) = self + if let Some(resp) = self .network .backfill_sync_block_and_blob_response(id, block_or_blob) { - match block_responses { + match resp.responses { Ok(blocks) => { for block in blocks .into_iter() - .map(|block| Some(block)) + .map(Some) // chain the stream terminator .chain(vec![None]) { match self.backfill_sync.on_block_response( &mut self.network, - batch_id, + resp.batch_id, &peer_id, id, block, @@ -883,7 +883,7 @@ impl SyncManager { // With time we will want to downgrade this log warn!( self.log, "Blocks and blobs request for backfill received invalid data"; - "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e + "peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy let id = RequestId::BackFillBlobs { id }; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index e8961d292..72c2db921 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -20,6 +20,17 @@ use std::sync::Arc; use tokio::sync::mpsc; use types::{BlobsSidecar, EthSpec, SignedBeaconBlock}; +pub struct BlocksAndBlobsByRangeResponse { + pub batch_id: BatchId, + pub responses: Result>, &'static str>, +} + +pub struct BlocksAndBlobsByRangeRequest { + pub chain_id: ChainId, + pub batch_id: BatchId, + pub block_blob_info: BlocksAndBlobsRequestInfo, +} + /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. @@ -38,8 +49,7 @@ pub struct SyncNetworkContext { backfill_requests: FnvHashMap, /// BlocksByRange requests paired with BlobsByRange requests made by the range. - range_blocks_and_blobs_requests: - FnvHashMap)>, + range_blocks_and_blobs_requests: FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync. backfill_blocks_and_blobs_requests: @@ -198,8 +208,14 @@ impl SyncNetworkContext { request_id, })?; let block_blob_info = BlocksAndBlobsRequestInfo::default(); - self.range_blocks_and_blobs_requests - .insert(id, (chain_id, batch_id, block_blob_info)); + self.range_blocks_and_blobs_requests.insert( + id, + BlocksAndBlobsByRangeRequest { + chain_id, + batch_id, + block_blob_info, + }, + ); Ok(id) } } @@ -290,22 +306,30 @@ impl SyncNetworkContext { &mut self, request_id: Id, block_or_blob: BlockOrBlobs, - ) -> Option<( - ChainId, - BatchId, - Result>, &'static str>, - )> { + ) -> Option<(ChainId, BlocksAndBlobsByRangeResponse)> { match self.range_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { - let (_, _, info) = entry.get_mut(); + let req = entry.get_mut(); + let info = &mut req.block_blob_info; match block_or_blob { BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block), BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything - let (chain_id, batch_id, info) = entry.remove(); - Some((chain_id, batch_id, info.into_responses())) + let BlocksAndBlobsByRangeRequest { + chain_id, + batch_id, + block_blob_info, + } = entry.remove(); + + Some(( + chain_id, + BlocksAndBlobsByRangeResponse { + batch_id, + responses: block_blob_info.into_responses(), + }, + )) } else { None } @@ -323,7 +347,7 @@ impl SyncNetworkContext { ByRangeRequestType::BlocksAndBlobs => self .range_blocks_and_blobs_requests .remove(&request_id) - .map(|(chain_id, batch_id, _info)| (chain_id, batch_id)), + .map(|req| (req.chain_id, req.batch_id)), ByRangeRequestType::Blocks => self.range_requests.remove(&request_id), } } @@ -349,20 +373,19 @@ impl SyncNetworkContext { is_stream_terminator: bool, ) -> Option { if is_stream_terminator { - self.backfill_requests - .remove(&request_id) - .map(|batch_id| batch_id) + self.backfill_requests.remove(&request_id) } else { self.backfill_requests.get(&request_id).copied() } } - /// Received a blocks by range response for a request that couples blocks and blobs. + /// Received a blocks by range or blobs by range response for a request that couples blocks ' + /// and blobs. pub fn backfill_sync_block_and_blob_response( &mut self, request_id: Id, block_or_blob: BlockOrBlobs, - ) -> Option<(BatchId, Result>, &'static str>)> { + ) -> Option> { match self.backfill_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); @@ -373,7 +396,10 @@ impl SyncNetworkContext { if info.is_finished() { // If the request is finished, dequeue everything let (batch_id, info) = entry.remove(); - Some((batch_id, info.into_responses())) + Some(BlocksAndBlobsByRangeResponse { + batch_id, + responses: info.into_responses(), + }) } else { None } @@ -535,15 +561,17 @@ impl SyncNetworkContext { /// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// blocks and blobs. pub fn batch_type(&self, _epoch: types::Epoch) -> ByRangeRequestType { - const _: () = assert!( - super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1 - && super::range_sync::EPOCHS_PER_BATCH == 1, - "To deal with alignment with 4844 boundaries, batches need to be of just one epoch" - ); + if super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH * super::range_sync::EPOCHS_PER_BATCH + != 1 + { + panic!( + "To deal with alignment with 4844 boundaries, batches need to be of just one epoch" + ); + } #[cfg(test)] { // Keep tests only for blocks. - return ByRangeRequestType::Blocks; + ByRangeRequestType::Blocks } #[cfg(not(test))] { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index ea78cd3c5..262e14e4e 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -615,6 +615,7 @@ impl SyncingChain { /// /// If a previous batch has been validated and it had been re-processed, penalize the original /// peer. + #[allow(clippy::modulo_one)] fn advance_chain(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { // make sure this epoch produces an advancement if validating_epoch <= self.start_epoch { diff --git a/consensus/state_processing/src/per_block_processing/eip4844/eip4844.rs b/consensus/state_processing/src/per_block_processing/eip4844/eip4844.rs index be4afc57d..51dead06d 100644 --- a/consensus/state_processing/src/per_block_processing/eip4844/eip4844.rs +++ b/consensus/state_processing/src/per_block_processing/eip4844/eip4844.rs @@ -49,7 +49,7 @@ pub fn verify_kzg_commitments_against_transactions( .flatten() // Need to use `itertools::zip_longest` here because just zipping hides if one iter is shorter // and `itertools::zip_eq` panics. - .zip_longest(kzg_commitments.into_iter()) + .zip_longest(kzg_commitments.iter()) .enumerate() .map(|(index, next)| match next { EitherOrBoth::Both(hash, commitment) => Ok((hash?, commitment)), diff --git a/consensus/types/src/beacon_block_body.rs b/consensus/types/src/beacon_block_body.rs index dc94ecdd5..11a47ccb0 100644 --- a/consensus/types/src/beacon_block_body.rs +++ b/consensus/types/src/beacon_block_body.rs @@ -1,5 +1,5 @@ -use crate::test_utils::TestRandom; use crate::*; +use crate::{blobs_sidecar::KzgCommitments, test_utils::TestRandom}; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; @@ -69,7 +69,7 @@ pub struct BeaconBlockBody = FullPay pub bls_to_execution_changes: VariableList, #[superstruct(only(Eip4844))] - pub blob_kzg_commitments: VariableList, + pub blob_kzg_commitments: KzgCommitments, #[superstruct(only(Base, Altair))] #[ssz(skip_serializing, skip_deserializing)] #[tree_hash(skip_hashing)] diff --git a/consensus/types/src/blobs_sidecar.rs b/consensus/types/src/blobs_sidecar.rs index 06bcba4ff..e2560fb30 100644 --- a/consensus/types/src/blobs_sidecar.rs +++ b/consensus/types/src/blobs_sidecar.rs @@ -1,5 +1,5 @@ use crate::test_utils::TestRandom; -use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot}; +use crate::{Blob, EthSpec, Hash256, KzgCommitment, SignedRoot, Slot}; use derivative::Derivative; use kzg::KzgProof; use serde_derive::{Deserialize, Serialize}; @@ -9,6 +9,9 @@ use ssz_types::VariableList; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; +pub type KzgCommitments = VariableList::MaxBlobsPerBlock>; +pub type Blobs = VariableList, ::MaxBlobsPerBlock>; + #[derive( Debug, Clone, @@ -29,7 +32,7 @@ pub struct BlobsSidecar { pub beacon_block_root: Hash256, pub beacon_block_slot: Slot, #[serde(with = "ssz_types::serde_utils::list_of_hex_fixed_vec")] - pub blobs: VariableList, T::MaxBlobsPerBlock>, + pub blobs: Blobs, pub kzg_aggregated_proof: KzgProof, } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index c09b34f87..27bdbed24 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -121,7 +121,7 @@ pub use crate::beacon_block_body::{ pub use crate::beacon_block_header::BeaconBlockHeader; pub use crate::beacon_committee::{BeaconCommittee, OwnedBeaconCommittee}; pub use crate::beacon_state::{BeaconTreeHashCache, Error as BeaconStateError, *}; -pub use crate::blobs_sidecar::BlobsSidecar; +pub use crate::blobs_sidecar::{Blobs, BlobsSidecar, KzgCommitments}; pub use crate::bls_to_execution_change::BlsToExecutionChange; pub use crate::chain_spec::{ChainSpec, Config, Domain}; pub use crate::checkpoint::Checkpoint; @@ -177,8 +177,8 @@ pub use crate::signed_beacon_block::{ SignedBlindedBeaconBlock, }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; -pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecar; pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecarDecode; +pub use crate::signed_block_and_blobs::{BlockWrapper, SignedBeaconBlockAndBlobsSidecar}; pub use crate::signed_bls_to_execution_change::SignedBlsToExecutionChange; pub use crate::signed_contribution_and_proof::SignedContributionAndProof; pub use crate::signed_voluntary_exit::SignedVoluntaryExit; diff --git a/crypto/kzg/src/kzg_commitment.rs b/crypto/kzg/src/kzg_commitment.rs index 8d6eefecd..16fe1d830 100644 --- a/crypto/kzg/src/kzg_commitment.rs +++ b/crypto/kzg/src/kzg_commitment.rs @@ -99,7 +99,7 @@ impl FromStr for KzgCommitment { impl Debug for KzgCommitment { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", eth2_serde_utils::hex::encode(&self.0)) + write!(f, "{}", eth2_serde_utils::hex::encode(self.0)) } } diff --git a/crypto/kzg/src/kzg_proof.rs b/crypto/kzg/src/kzg_proof.rs index 32166ee84..2dac5669b 100644 --- a/crypto/kzg/src/kzg_proof.rs +++ b/crypto/kzg/src/kzg_proof.rs @@ -123,7 +123,7 @@ impl FromStr for KzgProof { impl Debug for KzgProof { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", eth2_serde_utils::hex::encode(&self.0)) + write!(f, "{}", eth2_serde_utils::hex::encode(self.0)) } } diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index c179be06f..b6d62e053 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -72,7 +72,7 @@ impl Kzg { )); } let commitments = expected_kzg_commitments - .into_iter() + .iter() .map(|comm| comm.0.into()) .collect::>(); let proof: c_kzg::KZGProof = kzg_aggregated_proof.0.into(); diff --git a/lcli/src/new_testnet.rs b/lcli/src/new_testnet.rs index e4f68e41a..e28197e61 100644 --- a/lcli/src/new_testnet.rs +++ b/lcli/src/new_testnet.rs @@ -215,7 +215,7 @@ fn initialize_state_with_validators( // Seed RANDAO with Eth1 entropy state.fill_randao_mixes_with(eth1_block_hash); - for keypair in keypairs.into_iter() { + for keypair in keypairs.iter() { let withdrawal_credentials = |pubkey: &PublicKey| { let mut credentials = hash(&pubkey.as_ssz_bytes()); credentials[0] = spec.bls_withdrawal_prefix_byte;