diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 912dff112..0022929f5 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -975,10 +975,11 @@ impl SignatureVerifiedBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn new( - block: Arc>, + block_wrapper: BlockWrapper, block_root: Hash256, chain: &BeaconChain, ) -> Result> { + let (block, blobs_sidecar) = block_wrapper.deconstruct(); // Ensure the block is the correct structure for the fork at `block.slot()`. block .fork_name(&chain.spec) @@ -1009,7 +1010,8 @@ impl SignatureVerifiedBlock { Ok(Self { consensus_context: ConsensusContext::new(block.slot()) .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()), + .set_proposer_index(block.message().proposer_index()) + .set_blobs_sidecar(blobs_sidecar), block, block_root, parent: Some(parent), @@ -1021,11 +1023,11 @@ impl SignatureVerifiedBlock { /// As for `new` above but producing `BlockSlashInfo`. pub fn check_slashable( - block: Arc>, + block: BlockWrapper, block_root: Hash256, chain: &BeaconChain, ) -> Result>> { - let header = block.signed_block_header(); + let header = block.block().signed_block_header(); Self::new(block, block_root, chain).map_err(|e| BlockSlashInfo::from_early_error(header, e)) } @@ -1127,12 +1129,38 @@ impl IntoExecutionPendingBlock for Arc &SignedBeaconBlock { + self + } +} + +impl IntoExecutionPendingBlock for BlockWrapper { + /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` + /// and then using that implementation of `IntoExecutionPendingBlock` to complete verification. + fn into_execution_pending_block_slashable( + self, + block_root: Hash256, + chain: &Arc>, + ) -> Result, BlockSlashInfo>> { + // Perform an early check to prevent wasting time on irrelevant blocks. + let block_root = check_block_relevancy(self.block(), block_root, chain).map_err(|e| { + BlockSlashInfo::SignatureNotChecked(self.block().signed_block_header(), e) + })?; + SignatureVerifiedBlock::check_slashable(self, block_root, chain)? .into_execution_pending_block_slashable(block_root, chain) } fn block(&self) -> &SignedBeaconBlock { - self + self.block() } } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 718a17ea7..4088a639c 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -149,10 +149,6 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; /// be stored before we start dropping them. const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; -/// The maximum number of queued `Vec<[`SignedBeaconBlockAndBlobsSidecar`]>` objects received during syncing that will -/// be stored before we start dropping them. -const MAX_BLOB_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; - /// The maximum number of queued `StatusMessage` objects received from the network RPC that will be /// stored before we start dropping them. const MAX_STATUS_QUEUE_LEN: usize = 1_024; @@ -167,6 +163,8 @@ const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; +const MAX_BLOCK_AND_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024; + /// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them. /// /// This value is set high to accommodate the large spike that is expected immediately after Capella @@ -218,7 +216,6 @@ pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; -pub const BLOB_CHAIN_SEGMENT: &str = "blob_chain_segment"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -548,7 +545,7 @@ impl WorkEvent { /// sent to the other side of `result_tx`. pub fn rpc_beacon_block( block_root: Hash256, - block: Arc>, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Self { @@ -567,7 +564,7 @@ impl WorkEvent { /// Create a new work event to import `blocks` as a beacon chain segment. pub fn chain_segment( process_id: ChainSegmentProcessId, - blocks: Vec>>, + blocks: Vec>, ) -> Self { Self { drop_during_sync: false, @@ -575,19 +572,6 @@ impl WorkEvent { } } - pub fn blob_chain_segment( - process_id: ChainSegmentProcessId, - blocks_and_blobs: Vec>, - ) -> Self { - Self { - drop_during_sync: false, - work: Work::BlobChainSegment { - process_id, - blocks_and_blobs, - }, - } - } - /// Create a new work event to process `StatusMessage`s from the RPC network. pub fn status_message(peer_id: PeerId, message: StatusMessage) -> Self { Self { @@ -818,14 +802,14 @@ pub enum Work { }, RpcBlock { block_root: Hash256, - block: Arc>, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, should_process: bool, }, ChainSegment { process_id: ChainSegmentProcessId, - blocks: Vec>>, + blocks: Vec>, }, Status { peer_id: PeerId, @@ -856,10 +840,6 @@ pub enum Work { request_id: PeerRequestId, request: BlobsByRootRequest, }, - BlobChainSegment { - process_id: ChainSegmentProcessId, - blocks_and_blobs: Vec>, - }, } impl Work { @@ -888,7 +868,6 @@ impl Work { Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE, - Work::BlobChainSegment { .. } => BLOB_CHAIN_SEGMENT, } } } @@ -1024,7 +1003,6 @@ impl BeaconProcessor { let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); - let mut blob_chain_segment_queue = FifoQueue::new(MAX_BLOB_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); let mut gossip_block_and_blobs_sidecar_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN); @@ -1033,6 +1011,7 @@ impl BeaconProcessor { let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); + let mut blbroots_queue = FifoQueue::new(MAX_BLOCK_AND_BLOBS_BY_ROOTS_QUEUE_LEN); let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN); let mut gossip_bls_to_execution_change_queue = @@ -1127,10 +1106,6 @@ impl BeaconProcessor { // blocks into the system. if let Some(item) = chain_segment_queue.pop() { self.spawn_worker(item, toolbox); - // Check sync blocks before gossip blocks, since we've already explicitly - // requested these blocks. - } else if let Some(item) = blob_chain_segment_queue.pop() { - self.spawn_worker(item, toolbox); // Sync block and blob segments have the same priority as normal chain // segments. This here might change depending on how batch processing // evolves. @@ -1268,6 +1243,10 @@ impl BeaconProcessor { self.spawn_worker(item, toolbox); } else if let Some(item) = bbroots_queue.pop() { self.spawn_worker(item, toolbox); + } else if let Some(item) = blbrange_queue.pop() { + self.spawn_worker(item, toolbox); + } else if let Some(item) = blbroots_queue.pop() { + self.spawn_worker(item, toolbox); // Check slashings after all other consensus messages so we prioritize // following head. // @@ -1402,13 +1381,8 @@ impl BeaconProcessor { Work::GossipBlsToExecutionChange { .. } => { gossip_bls_to_execution_change_queue.push(work, work_id, &self.log) } - Work::BlobsByRootsRequest { - peer_id, - request_id, - request, - } => todo!(), - Work::BlobChainSegment { .. } => { - blob_chain_segment_queue.push(work, work_id, &self.log) + Work::BlobsByRootsRequest { .. } => { + blbroots_queue.push(work, work_id, &self.log) } } } @@ -1763,13 +1737,8 @@ impl BeaconProcessor { /* * Verification for a chain segment (multiple blocks). */ - Work::ChainSegment { process_id, blocks } => task_spawner.spawn_async(async move { - let wrapped = blocks - .into_iter() - .map(|block| BlockWrapper::Block { block }) - .collect(); - worker.process_chain_segment(process_id, wrapped).await - }), + Work::ChainSegment { process_id, blocks } => task_spawner + .spawn_async(async move { worker.process_chain_segment(process_id, blocks).await }), /* * Processing of Status Messages. */ @@ -1869,18 +1838,6 @@ impl BeaconProcessor { seen_timestamp, ) }), - Work::BlobChainSegment { - process_id, - blocks_and_blobs, - } => task_spawner.spawn_async(async move { - let wrapped = blocks_and_blobs - .into_iter() - .map(|b| BlockWrapper::BlockAndBlob { - block_sidecar_pair: b, - }) - .collect(); - worker.process_chain_segment(process_id, wrapped).await - }), }; } } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 2aeec11c3..e0c934745 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -30,6 +30,7 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; +use types::signed_block_and_blobs::BlockWrapper; use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; @@ -110,7 +111,7 @@ pub struct QueuedGossipBlock { /// It is queued for later import. pub struct QueuedRpcBlock { pub block_root: Hash256, - pub block: Arc>, + pub block: BlockWrapper, pub process_type: BlockProcessType, pub seen_timestamp: Duration, /// Indicates if the beacon chain should process this block or not. @@ -394,7 +395,7 @@ impl ReprocessQueue { debug!( log, "Sending rpc block for reprocessing"; - "block_root" => %queued_rpc_block.block.canonical_root() + "block_root" => %queued_rpc_block.block_root ); if self .ready_work_tx diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 114d9805b..05a4fb75a 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -46,7 +46,7 @@ impl Worker { pub async fn process_rpc_block( self, block_root: Hash256, - block: Arc>, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, reprocess_tx: mpsc::Sender>, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index e495daf3c..76850a545 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -24,11 +24,9 @@ use std::collections::{ HashMap, HashSet, }; use std::sync::Arc; +use types::signed_block_and_blobs::BlockWrapper; use types::{Epoch, EthSpec}; -use super::manager::BlockTy; -use super::range_sync::BatchTy; - /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for /// already requested slots. There is a timeout for each batch request. If this value is too high, @@ -57,7 +55,7 @@ impl BatchConfig for BackFillBatchConfig { fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } - fn batch_attempt_hash(blocks: &[BlockTy]) -> u64 { + fn batch_attempt_hash(blocks: &[BlockWrapper]) -> u64 { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; let mut hasher = DefaultHasher::new(); @@ -393,7 +391,7 @@ impl BackFillSync { batch_id: BatchId, peer_id: &PeerId, request_id: Id, - beacon_block: Option>, + beacon_block: Option>, ) -> Result { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { @@ -538,12 +536,7 @@ impl BackFillSync { let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); self.current_processing_batch = Some(batch_id); - let work_event = match blocks { - BatchTy::Blocks(blocks) => BeaconWorkEvent::chain_segment(process_id, blocks), - BatchTy::BlocksAndBlobs(blocks_and_blobs) => { - BeaconWorkEvent::blob_chain_segment(process_id, blocks_and_blobs) - } - }; + let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks()); if let Err(e) = network.processor_channel().try_send(work_event) { crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch", "error" => %e, "batch" => self.processing_target); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 4ebbf1c1d..a1a74b5dd 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -3,12 +3,15 @@ use std::time::Duration; use beacon_chain::{BeaconChainTypes, BlockError}; use fnv::FnvHashMap; +use futures::StreamExt; +use itertools::{Either, Itertools}; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; +use types::signed_block_and_blobs::BlockWrapper; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; @@ -18,7 +21,7 @@ use self::{ single_block_lookup::SingleBlockRequest, }; -use super::manager::{BlockProcessResult, BlockTy}; +use super::manager::BlockProcessResult; use super::BatchProcessResult; use super::{ manager::{BlockProcessType, Id}, @@ -30,7 +33,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type RootBlockTuple = (Hash256, BlockTy); +pub type RootBlockTuple = (Hash256, BlockWrapper); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; @@ -106,7 +109,7 @@ impl BlockLookups { pub fn search_parent( &mut self, block_root: Hash256, - block: BlockTy, + block: BlockWrapper, peer_id: PeerId, cx: &mut SyncNetworkContext, ) { @@ -139,7 +142,7 @@ impl BlockLookups { &mut self, id: Id, peer_id: PeerId, - block: Option>, + block: Option>, seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { @@ -204,7 +207,7 @@ impl BlockLookups { &mut self, id: Id, peer_id: PeerId, - block: Option>, + block: Option>, seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { @@ -426,7 +429,7 @@ impl BlockLookups { error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); } BlockError::ParentUnknown(block) => { - self.search_parent(root, BlockTy::Block { block }, peer_id, cx); + self.search_parent(root, BlockWrapper::Block { block }, peer_id, cx); } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline @@ -506,7 +509,7 @@ impl BlockLookups { BlockProcessResult::Err(BlockError::ParentUnknown(block)) => { // need to keep looking for parents // add the block back to the queue and continue the search - parent_lookup.add_block(BlockTy::Block { block }); + parent_lookup.add_block(BlockWrapper::Block { block }); self.request_parent(parent_lookup, cx); } BlockProcessResult::Ok @@ -525,8 +528,8 @@ impl BlockLookups { let chain_hash = parent_lookup.chain_hash(); let blocks = parent_lookup.chain_blocks(); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); - // let work = WorkEvent::chain_segment(process_id, blocks); - let work = todo!("this means we can have batches of mixed type"); + + let work = WorkEvent::chain_segment(process_id, blocks); match beacon_processor_send.try_send(work) { Ok(_) => { @@ -634,7 +637,7 @@ impl BlockLookups { fn send_block_for_processing( &mut self, block_root: Hash256, - block: BlockTy, + block: BlockWrapper, duration: Duration, process_type: BlockProcessType, cx: &mut SyncNetworkContext, @@ -642,16 +645,7 @@ impl BlockLookups { match cx.processor_channel_if_enabled() { Some(beacon_processor_send) => { trace!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type); - let event = match block { - BlockTy::Block { block } => { - WorkEvent::rpc_beacon_block(block_root, block, duration, process_type) - } - BlockTy::BlockAndBlob { block_sidecar_pair } => { - //FIXME(sean) - // WorkEvent::rpc_block_and_glob(block_sidecar_pair) - todo!("we also need to process block-glob pairs for rpc") - } - }; + let event = WorkEvent::rpc_beacon_block(block_root, block, duration, process_type); if let Err(e) = beacon_processor_send.try_send(event) { error!( self.log, diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 563525f55..4a26e78fc 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -4,9 +4,10 @@ use lighthouse_network::PeerId; use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; use strum::IntoStaticStr; +use types::signed_block_and_blobs::BlockWrapper; use crate::sync::{ - manager::{BlockTy, Id, SLOT_IMPORT_TOLERANCE}, + manager::{Id, SLOT_IMPORT_TOLERANCE}, network_context::SyncNetworkContext, }; @@ -24,7 +25,7 @@ pub(crate) struct ParentLookup { /// The root of the block triggering this parent request. chain_hash: Hash256, /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, /// Request of the last parent. current_parent_request: SingleBlockRequest, /// Id of the last parent request. @@ -59,7 +60,7 @@ impl ParentLookup { .any(|d_block| d_block.block() == block) } - pub fn new(block_root: Hash256, block: BlockTy, peer_id: PeerId) -> Self { + pub fn new(block_root: Hash256, block: BlockWrapper, peer_id: PeerId) -> Self { let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); Self { @@ -94,7 +95,7 @@ impl ParentLookup { self.current_parent_request.check_peer_disconnected(peer_id) } - pub fn add_block(&mut self, block: BlockTy) { + pub fn add_block(&mut self, block: BlockWrapper) { let next_parent = block.parent_root(); self.downloaded_blocks.push(block); self.current_parent_request.hash = next_parent; @@ -121,7 +122,7 @@ impl ParentLookup { self.current_parent_request_id = None; } - pub fn chain_blocks(&mut self) -> Vec> { + pub fn chain_blocks(&mut self) -> Vec> { std::mem::take(&mut self.downloaded_blocks) } @@ -129,7 +130,7 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_block( &mut self, - block: Option>, + block: Option>, failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result>, VerifyError> { let root_and_block = self.current_parent_request.verify_block(block)?; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 368c4f518..0e84fb0bb 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,15 +1,13 @@ -use std::collections::HashSet; -use std::sync::Arc; - -use crate::sync::manager::BlockTy; - use super::RootBlockTuple; use beacon_chain::get_block_root; use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; use rand::seq::IteratorRandom; use ssz_types::VariableList; +use std::collections::HashSet; +use std::sync::Arc; use store::{EthSpec, Hash256, SignedBeaconBlock}; use strum::IntoStaticStr; +use types::signed_block_and_blobs::BlockWrapper; /// Object representing a single block lookup request. #[derive(PartialEq, Eq)] @@ -107,7 +105,7 @@ impl SingleBlockRequest { /// Returns the block for processing if the response is what we expected. pub fn verify_block( &mut self, - block: Option>, + block: Option>, ) -> Result>, VerifyError> { match self.state { State::AwaitingDownload => { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 18b5e43b2..8bb33fe0c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -54,6 +54,7 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; +use types::signed_block_and_blobs::BlockWrapper; use types::{ BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot, }; @@ -69,66 +70,6 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32; pub type Id = u32; -#[derive(Debug)] -pub enum BlockTy { - Block { - block: Arc>, - }, - BlockAndBlob { - block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar, - }, -} - -#[cfg(test)] -impl From> for BlockTy { - fn from(block: SignedBeaconBlock) -> Self { - BlockTy::Block { - block: Arc::new(block), - } - } -} - -#[cfg(test)] -impl From>> for BlockTy { - fn from(block: Arc>) -> Self { - BlockTy::Block { block } - } -} - -impl BlockTy { - pub fn block(&self) -> &SignedBeaconBlock { - match &self { - BlockTy::Block { block } => block, - BlockTy::BlockAndBlob { block_sidecar_pair } => &block_sidecar_pair.beacon_block, - } - } - - pub fn parent_root(&self) -> Hash256 { - self.block().parent_root() - } -} - -// TODO: probably needes to be changed. This is needed because SignedBeaconBlockAndBlobsSidecar -// does not implement Hash -impl std::hash::Hash for BlockTy { - fn hash(&self, state: &mut H) { - match self { - BlockTy::Block { block } => block.hash(state), - BlockTy::BlockAndBlob { - block_sidecar_pair: block_and_blob, - } => block_and_blob.beacon_block.hash(state), - } - } -} - -impl BlockTy { - pub fn slot(&self) -> Slot { - match self { - BlockTy::Block { block } => block.slot(), - BlockTy::BlockAndBlob { block_sidecar_pair } => block_sidecar_pair.beacon_block.slot(), - } - } -} /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RequestId { @@ -645,7 +586,7 @@ impl SyncManager { // the block and blob since for block lookups we don't care. self.block_lookups.search_parent( block_root, - BlockTy::Block { block }, + BlockWrapper::Block { block }, peer_id, &mut self.network, ); @@ -795,14 +736,14 @@ impl SyncManager { RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( id, peer_id, - beacon_block.map(|block| BlockTy::Block { block }), + beacon_block.map(|block| BlockWrapper::Block { block }), seen_timestamp, &mut self.network, ), RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( id, peer_id, - beacon_block.map(|block| BlockTy::Block { block }), + beacon_block.map(|block| BlockWrapper::Block { block }), seen_timestamp, &mut self.network, ), @@ -958,7 +899,7 @@ impl SyncManager { RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( id, peer_id, - block_sidecar_pair.map(|block_sidecar_pair| BlockTy::BlockAndBlob { + block_sidecar_pair.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { // TODO: why is this in an arc block_sidecar_pair: (*block_sidecar_pair).clone(), }), @@ -968,7 +909,7 @@ impl SyncManager { RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( id, peer_id, - block_sidecar_pair.map(|block_sidecar_pair| BlockTy::BlockAndBlob { + block_sidecar_pair.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { // TODO: why is this in an arc block_sidecar_pair: (*block_sidecar_pair).clone(), }), diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 75eb72b82..adefb89dd 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,7 +1,7 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -use super::manager::{BlockTy, Id, RequestId as SyncRequestId}; +use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ChainId, ExpectedBatchTy}; use crate::beacon_processor::WorkEvent; use crate::service::{NetworkMessage, RequestId}; @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::VecDeque; use std::sync::Arc; use tokio::sync::mpsc; +use types::signed_block_and_blobs::BlockWrapper; use types::{BlobsSidecar, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar}; #[derive(Debug, Default)] @@ -297,7 +298,7 @@ impl SyncNetworkContext { request_id: Id, maybe_block: Option>>, batch_type: ExpectedBatchTy, - ) -> Option<(ChainId, BatchId, Option>)> { + ) -> Option<(ChainId, BatchId, Option>)> { match batch_type { ExpectedBatchTy::OnlyBlockBlobs => { match self.range_sidecar_pair_requests.entry(request_id) { @@ -306,9 +307,9 @@ impl SyncNetworkContext { let chain_id = chain_id.clone(); let batch_id = batch_id.clone(); info.add_block_response(maybe_block); - let maybe_block = info - .pop_response() - .map(|block_sidecar_pair| BlockTy::BlockAndBlob { block_sidecar_pair }); + let maybe_block = info.pop_response().map(|block_sidecar_pair| { + BlockWrapper::BlockAndBlob { block_sidecar_pair } + }); if info.is_finished() { entry.remove(); } @@ -325,7 +326,7 @@ impl SyncNetworkContext { .get(&request_id) .cloned() .map(|(chain_id, batch_id)| { - (chain_id, batch_id, Some(BlockTy::Block { block })) + (chain_id, batch_id, Some(BlockWrapper::Block { block })) }) } None => self @@ -341,7 +342,7 @@ impl SyncNetworkContext { &mut self, request_id: Id, maybe_sidecar: Option>>, - ) -> Option<(ChainId, BatchId, Option>)> { + ) -> Option<(ChainId, BatchId, Option>)> { match self.range_sidecar_pair_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (chain_id, batch_id, info) = entry.get_mut(); @@ -350,7 +351,7 @@ impl SyncNetworkContext { info.add_sidecar_response(maybe_sidecar); let maybe_block = info .pop_response() - .map(|block_sidecar_pair| BlockTy::BlockAndBlob { block_sidecar_pair }); + .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair }); if info.is_finished() { entry.remove(); } @@ -394,7 +395,7 @@ impl SyncNetworkContext { request_id: Id, maybe_block: Option>>, batch_type: ExpectedBatchTy, - ) -> Option<(BatchId, Option>)> { + ) -> Option<(BatchId, Option>)> { match batch_type { ExpectedBatchTy::OnlyBlockBlobs => { match self.backfill_sidecar_pair_requests.entry(request_id) { @@ -402,9 +403,9 @@ impl SyncNetworkContext { let (batch_id, info) = entry.get_mut(); let batch_id = batch_id.clone(); info.add_block_response(maybe_block); - let maybe_block = info - .pop_response() - .map(|block_sidecar_pair| BlockTy::BlockAndBlob { block_sidecar_pair }); + let maybe_block = info.pop_response().map(|block_sidecar_pair| { + BlockWrapper::BlockAndBlob { block_sidecar_pair } + }); if info.is_finished() { entry.remove(); } @@ -420,7 +421,7 @@ impl SyncNetworkContext { .backfill_requests .get(&request_id) .cloned() - .map(|batch_id| (batch_id, Some(BlockTy::Block { block }))), + .map(|batch_id| (batch_id, Some(BlockWrapper::Block { block }))), None => self .backfill_requests .remove(&request_id) @@ -434,7 +435,7 @@ impl SyncNetworkContext { &mut self, request_id: Id, maybe_sidecar: Option>>, - ) -> Option<(BatchId, Option>)> { + ) -> Option<(BatchId, Option>)> { match self.backfill_sidecar_pair_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (batch_id, info) = entry.get_mut(); @@ -442,7 +443,7 @@ impl SyncNetworkContext { info.add_sidecar_response(maybe_sidecar); let maybe_block = info .pop_response() - .map(|block_sidecar_pair| BlockTy::BlockAndBlob { block_sidecar_pair }); + .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair }); if info.is_finished() { entry.remove(); } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 80819d57e..b0d266e07 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,10 +1,11 @@ -use crate::sync::manager::{BlockTy, Id}; +use crate::sync::manager::Id; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::PeerId; use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::ops::Sub; use std::sync::Arc; +use types::signed_block_and_blobs::BlockWrapper; use types::{Epoch, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot}; /// The number of times to retry a batch before it is considered failed. @@ -19,6 +20,21 @@ pub enum BatchTy { BlocksAndBlobs(Vec>), } +impl BatchTy { + pub fn into_wrapped_blocks(self) -> Vec> { + match self { + BatchTy::Blocks(blocks) => blocks + .into_iter() + .map(|block| BlockWrapper::Block { block }) + .collect(), + BatchTy::BlocksAndBlobs(block_sidecar_pair) => block_sidecar_pair + .into_iter() + .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair }) + .collect(), + } + } +} + /// Error representing a batch with mixed block types. #[derive(Debug)] pub struct MixedBlockTyErr; @@ -63,7 +79,7 @@ pub trait BatchConfig { /// Note that simpler hashing functions considered in the past (hash of first block, hash of last /// block, number of received blocks) are not good enough to differentiate attempts. For this /// reason, we hash the complete set of blocks both in RangeSync and BackFillSync. - fn batch_attempt_hash(blocks: &[BlockTy]) -> u64; + fn batch_attempt_hash(blocks: &[BlockWrapper]) -> u64; } pub struct RangeSyncBatchConfig {} @@ -75,7 +91,7 @@ impl BatchConfig for RangeSyncBatchConfig { fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } - fn batch_attempt_hash(blocks: &[BlockTy]) -> u64 { + fn batch_attempt_hash(blocks: &[BlockWrapper]) -> u64 { let mut hasher = std::collections::hash_map::DefaultHasher::new(); blocks.hash(&mut hasher); hasher.finish() @@ -123,9 +139,9 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Vec>, Id), + Downloading(PeerId, Vec>, Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>), + AwaitingProcessing(PeerId, Vec>), /// The batch is being processed. Processing(Attempt), /// The batch was successfully processed and is waiting to be validated. @@ -258,7 +274,7 @@ impl BatchInfo { } /// Adds a block to a downloading batch. - pub fn add_block(&mut self, block: BlockTy) -> Result<(), WrongState> { + pub fn add_block(&mut self, block: BlockWrapper) -> Result<(), WrongState> { match self.state.poison() { BatchState::Downloading(peer, mut blocks, req_id) => { blocks.push(block); @@ -397,7 +413,7 @@ impl BatchInfo { match self.batch_type { ExpectedBatchTy::OnlyBlockBlobs => { let blocks = blocks.into_iter().map(|block| { - let BlockTy::BlockAndBlob { block_sidecar_pair: block_and_blob } = block else { + let BlockWrapper::BlockAndBlob { block_sidecar_pair: block_and_blob } = block else { panic!("Batches should never have a mixed type. This is a bug. Contact D") }; block_and_blob @@ -406,7 +422,7 @@ impl BatchInfo { } ExpectedBatchTy::OnlyBlock => { let blocks = blocks.into_iter().map(|block| { - let BlockTy::Block { block } = block else { + let BlockWrapper::Block { block } = block else { panic!("Batches should never have a mixed type. This is a bug. Contact D") }; block @@ -507,7 +523,7 @@ pub struct Attempt { } impl Attempt { - fn new(peer_id: PeerId, blocks: &[BlockTy]) -> Self { + fn new(peer_id: PeerId, blocks: &[BlockWrapper]) -> Self { let hash = B::batch_attempt_hash(blocks); Attempt { peer_id, hash } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 09e5bf263..199be788e 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,7 +1,6 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use super::BatchTy; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; -use crate::sync::manager::BlockTy; use crate::sync::{ manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult, }; @@ -12,6 +11,7 @@ use rand::seq::SliceRandom; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::hash::{Hash, Hasher}; +use types::signed_block_and_blobs::BlockWrapper; use types::{Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of @@ -226,7 +226,7 @@ impl SyncingChain { batch_id: BatchId, peer_id: &PeerId, request_id: Id, - beacon_block: Option>, + beacon_block: Option>, ) -> ProcessingResult { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { @@ -327,12 +327,7 @@ impl SyncingChain { let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized); self.current_processing_batch = Some(batch_id); - let work_event = match blocks { - BatchTy::Blocks(blocks) => BeaconWorkEvent::chain_segment(process_id, blocks), - BatchTy::BlocksAndBlobs(blocks_and_blobs) => { - BeaconWorkEvent::blob_chain_segment(process_id, blocks_and_blobs) - } - }; + let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks()); if let Err(e) = beacon_processor_send.try_send(work_event) { crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index b28757bc0..ca5e13397 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -44,7 +44,7 @@ use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::status::ToStatusMessage; -use crate::sync::manager::{BlockTy, Id}; +use crate::sync::manager::Id; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -55,6 +55,7 @@ use lru_cache::LRUTimeCache; use slog::{crit, debug, trace, warn}; use std::collections::HashMap; use std::sync::Arc; +use types::signed_block_and_blobs::BlockWrapper; use types::{Epoch, EthSpec, Hash256, Slot}; /// For how long we store failed finalized chains to prevent retries. @@ -202,7 +203,7 @@ where chain_id: ChainId, batch_id: BatchId, request_id: Id, - beacon_block: Option>, + beacon_block: Option>, ) { // check if this chunk removes the chain match self.chains.call_by_id(chain_id, |chain| { diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index be47e66c9..83db5e73e 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -1,4 +1,4 @@ -use crate::{BlobsSidecar, EthSpec, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot}; +use crate::{BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot}; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, DecodeError}; use ssz_derive::{Decode, Encode}; @@ -74,4 +74,48 @@ impl BlockWrapper { } } } + + pub fn parent_root(&self) -> Hash256 { + self.block().parent_root() + } + + pub fn deconstruct(self) -> (Arc>, Option>>) { + match self { + BlockWrapper::Block { block } => (block, None), + BlockWrapper::BlockAndBlob { block_sidecar_pair } => { + let SignedBeaconBlockAndBlobsSidecar { + beacon_block, + blobs_sidecar, + } = block_sidecar_pair; + (beacon_block, Some(blobs_sidecar)) + } + } + } +} + +// TODO: probably needes to be changed. This is needed because SignedBeaconBlockAndBlobsSidecar +// does not implement Hash +impl std::hash::Hash for BlockWrapper { + fn hash(&self, state: &mut H) { + match self { + BlockWrapper::Block { block } => block.hash(state), + BlockWrapper::BlockAndBlob { + block_sidecar_pair: block_and_blob, + } => block_and_blob.beacon_block.hash(state), + } + } +} + +impl From> for BlockWrapper { + fn from(block: SignedBeaconBlock) -> Self { + BlockWrapper::Block { + block: Arc::new(block), + } + } +} + +impl From>> for BlockWrapper { + fn from(block: Arc>) -> Self { + BlockWrapper::Block { block } + } }