From cd6655dba91e46b2f91edfdb0a39135c95ea7cff Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 22 Dec 2022 17:30:04 -0500 Subject: [PATCH 1/7] handle no blobs from peers instead of empty blobs in range requests --- .../src/sync/block_sidecar_coupling.rs | 115 +++++++++ beacon_node/network/src/sync/manager.rs | 197 ++++++++------ beacon_node/network/src/sync/mod.rs | 1 + .../network/src/sync/network_context.rs | 241 ++++++------------ .../network/src/sync/range_sync/range.rs | 22 +- consensus/types/src/blobs_sidecar.rs | 9 +- 6 files changed, 317 insertions(+), 268 deletions(-) create mode 100644 beacon_node/network/src/sync/block_sidecar_coupling.rs diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs new file mode 100644 index 000000000..762f37692 --- /dev/null +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -0,0 +1,115 @@ +use std::{ + collections::{hash_map::OccupiedEntry, VecDeque}, + sync::Arc, +}; + +use types::{ + signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock, + SignedBeaconBlockAndBlobsSidecar, +}; + +struct ReceivedData { + block: Option>>, + blob: Option>>, +} + +#[derive(Debug, Default)] +pub struct BlockBlobRequestInfo { + /// Blocks we have received awaiting for their corresponding sidecar. + accumulated_blocks: VecDeque>>, + /// Sidecars we have received awaiting for their corresponding block. + accumulated_sidecars: VecDeque>>, + /// Whether the individual RPC request for blocks is finished or not. + is_blocks_rpc_finished: bool, + /// Whether the individual RPC request for sidecars is finished or not. + is_sidecar_rpc_finished: bool, +} + +pub struct BlockBlobRequestEntry<'a, K, T: EthSpec> { + entry: OccupiedEntry<'a, K, BlockBlobRequestInfo>, +} + +impl<'a, K, T: EthSpec> From>> + for BlockBlobRequestEntry<'a, K, T> +{ + fn from(entry: OccupiedEntry<'a, K, BlockBlobRequestInfo>) -> Self { + BlockBlobRequestEntry { entry } + } +} + +impl BlockBlobRequestInfo { + pub fn add_block_response(&mut self, maybe_block: Option>>) { + match maybe_block { + Some(block) => self.accumulated_blocks.push_back(block), + None => self.is_blocks_rpc_finished = true, + } + } + + pub fn add_sidecar_response(&mut self, maybe_sidecar: Option>>) { + match maybe_sidecar { + Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), + None => self.is_sidecar_rpc_finished = true, + } + } + + pub fn into_responses(self) -> Result>, &'static str> { + let BlockBlobRequestInfo { + accumulated_blocks, + accumulated_sidecars, + .. + } = self; + + // Create the storage for our pairs. + let mut pairs = Vec::with_capacity(accumulated_blocks.len()); + + // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any block (empty + // included) for a skipped slot is not permitted. + for sidecar in accumulated_sidecars { + let blob_slot = sidecar.beacon_block_slot; + // First queue any blocks that might not have a blob. + while let Some(block) = { + // We identify those if their slot is less than the current blob's slot. + match accumulated_blocks.front() { + Some(borrowed_block) if borrowed_block.slot() < blob_slot => { + accumulated_blocks.pop_front() + } + Some(_) => None, + None => { + // We received a blob and ran out of blocks. This is a peer error + return Err("Blob without more blobs to pair with returned by peer"); + } + } + } { + pairs.push(BlockWrapper::Block { block }) + } + + // The next block must be present and must match the blob's slot + let next_block = accumulated_blocks + .pop_front() + .expect("If block stream ended, an error was previously returned"); + if next_block.slot() != blob_slot { + // We verified that the slot of the block is not less than the slot of the blob (it + // would have been returned before). It's also not equal, so this block is ahead + // than the blob. This means the blob is not paired. + return Err("Blob without a matching block returned by peer"); + } + pairs.push(BlockWrapper::BlockAndBlob { + block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar { + beacon_block: next_block, + blobs_sidecar: sidecar, + }, + }); + } + + // Every remaining block does not have a blob + for block in accumulated_blocks { + pairs.push(BlockWrapper::Block { block }) + } + + Ok(pairs) + } + + pub fn is_finished(&self) -> bool { + self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 60105d422..155e2d213 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::SyncNetworkContext; +use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; @@ -45,11 +45,11 @@ use crate::sync::range_sync::ExpectedBatchTy; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; -use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; +use lighthouse_network::rpc::RPCError; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; -use slog::{crit, debug, error, info, trace, Logger}; +use slog::{crit, debug, error, info, trace, warn, Logger}; use std::boxed::Box; use std::ops::Sub; use std::sync::Arc; @@ -746,17 +746,17 @@ impl SyncManager { &mut self.network, ), RequestId::BackFillSync { id } => { - if let Some((batch_id, block)) = self.network.backfill_sync_block_response( - id, - beacon_block, - ExpectedBatchTy::OnlyBlock, - ) { + let is_stream_terminator = beacon_block.is_none(); + if let Some(batch_id) = self + .network + .backfill_sync_only_blocks_response(id, is_stream_terminator) + { match self.backfill_sync.on_block_response( &mut self.network, batch_id, &peer_id, id, - block, + beacon_block.map(|block| BlockWrapper::Block { block }), ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -769,61 +769,125 @@ impl SyncManager { } } RequestId::RangeSync { id } => { - if let Some((chain_id, batch_id, block)) = self.network.range_sync_block_response( - id, - beacon_block, - ExpectedBatchTy::OnlyBlock, - ) { + let is_stream_terminator = beacon_block.is_none(); + if let Some((chain_id, batch_id)) = self + .network + .range_sync_block_response(id, is_stream_terminator) + { self.range_sync.blocks_by_range_response( &mut self.network, peer_id, chain_id, batch_id, id, - block, + beacon_block.map(|block| BlockWrapper::Block { block }), ); self.update_sync_state(); } } RequestId::BackFillSidecarPair { id } => { - if let Some((batch_id, block)) = self.network.backfill_sync_block_response( - id, - beacon_block, - ExpectedBatchTy::OnlyBlockBlobs, - ) { - match self.backfill_sync.on_block_response( - &mut self.network, - batch_id, - &peer_id, - id, - block, - ) { - Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), - Ok(ProcessResult::Successful) => {} - Err(_error) => { - // The backfill sync has failed, errors are reported - // within. - self.update_sync_state(); + self.block_blob_backfill_response(id, peer_id, beacon_block.into()) + } + RequestId::RangeSidecarPair { id } => { + self.block_blob_range_response(id, peer_id, beacon_block.into()) + } + } + } + + /// Handles receiving a response for a range sync request that should have both blocks and + /// blobs. + fn block_blob_range_response( + &mut self, + id: Id, + peer_id: PeerId, + block_or_blob: BlockOrBlob, + ) { + if let Some((chain_id, batch_id, block_responses)) = self + .network + .range_sync_block_and_blob_response(id, block_or_blob) + { + match block_responses { + Ok(blocks) => { + for block in blocks + .into_iter() + .map(|block| Some(block)) + // chain the stream terminator + .chain(vec![None]) + { + self.range_sync.blocks_by_range_response( + &mut self.network, + peer_id, + chain_id, + batch_id, + id, + block, + ); + self.update_sync_state(); + } + } + Err(e) => { + // inform backfill that the request needs to be treated as failed + // With time we will want to downgrade this log + warn!( + self.log, "Blocks and blobs request for backfill received invalid data"; + "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e + ); + // TODO: penalize the peer for being a bad boy + let id = RequestId::RangeSidecarPair { id }; + self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) + } + } + } + } + + /// Handles receiving a response for a bacjfill sync request that should have both blocks and + /// blobs. + fn block_blob_backfill_response( + &mut self, + id: Id, + peer_id: PeerId, + block_or_blob: BlockOrBlob, + ) { + if let Some((batch_id, block_responses)) = self + .network + .backfill_sync_block_and_blob_response(id, block_or_blob) + { + match block_responses { + Ok(blocks) => { + for block in blocks + .into_iter() + .map(|block| Some(block)) + // chain the stream terminator + .chain(vec![None]) + { + match self.backfill_sync.on_block_response( + &mut self.network, + batch_id, + &peer_id, + id, + block, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_error) => { + // The backfill sync has failed, errors are reported + // within. + self.update_sync_state(); + } } } } - } - RequestId::RangeSidecarPair { id } => { - if let Some((chain_id, batch_id, block)) = self.network.range_sync_block_response( - id, - beacon_block, - ExpectedBatchTy::OnlyBlockBlobs, - ) { - self.range_sync.blocks_by_range_response( - &mut self.network, - peer_id, - chain_id, - batch_id, - id, - block, + Err(e) => { + // inform backfill that the request needs to be treated as failed + // With time we will want to downgrade this log + warn!( + self.log, "Blocks and blobs request for backfill received invalid data"; + "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e ); - self.update_sync_state(); + // TODO: penalize the peer for being a bad boy + let id = RequestId::BackFillSidecarPair { id }; + self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) } } } @@ -844,44 +908,13 @@ impl SyncManager { unreachable!("An only blocks request does not receive sidecars") } RequestId::BackFillSidecarPair { id } => { - if let Some((batch_id, block)) = self - .network - .backfill_sync_sidecar_response(id, maybe_sidecar) - { - match self.backfill_sync.on_block_response( - &mut self.network, - batch_id, - &peer_id, - id, - block, - ) { - Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), - Ok(ProcessResult::Successful) => {} - Err(_error) => { - // The backfill sync has failed, errors are reported - // within. - self.update_sync_state(); - } - } - } + self.block_blob_backfill_response(id, peer_id, maybe_sidecar.into()) } RequestId::RangeSync { .. } => { - unreachable!("And only blocks range request does not receive sidecars") + unreachable!("Only-blocks range requests don't receive sidecars") } RequestId::RangeSidecarPair { id } => { - if let Some((chain_id, batch_id, block)) = - self.network.range_sync_sidecar_response(id, maybe_sidecar) - { - self.range_sync.blocks_by_range_response( - &mut self.network, - peer_id, - chain_id, - batch_id, - id, - block, - ); - self.update_sync_state(); - } + self.block_blob_range_response(id, peer_id, maybe_sidecar.into()) } } } diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index dc18a5c98..7b244bcec 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -3,6 +3,7 @@ //! Stores the various syncing methods for the beacon chain. mod backfill_sync; mod block_lookups; +mod block_sidecar_coupling; pub mod manager; mod network_context; mod peer_sync_info; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 978bd69d0..912a5620e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,6 +1,7 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. +use super::block_sidecar_coupling::BlockBlobRequestInfo; use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ChainId, ExpectedBatchTy}; use crate::beacon_processor::WorkEvent; @@ -13,59 +14,11 @@ use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; -use slot_clock::SlotClock; use std::collections::hash_map::Entry; -use std::collections::VecDeque; use std::sync::Arc; use tokio::sync::mpsc; use types::signed_block_and_blobs::BlockWrapper; -use types::{ - BlobsSidecar, ChainSpec, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, -}; - -#[derive(Debug, Default)] -struct BlockBlobRequestInfo { - /// Blocks we have received awaiting for their corresponding sidecar. - accumulated_blocks: VecDeque>>, - /// Sidecars we have received awaiting for their corresponding block. - accumulated_sidecars: VecDeque>>, - /// Whether the individual RPC request for blocks is finished or not. - is_blocks_rpc_finished: bool, - /// Whether the individual RPC request for sidecars is finished or not. - is_sidecar_rpc_finished: bool, -} - -impl BlockBlobRequestInfo { - pub fn add_block_response(&mut self, maybe_block: Option>>) { - match maybe_block { - Some(block) => self.accumulated_blocks.push_back(block), - None => self.is_blocks_rpc_finished = true, - } - } - - pub fn add_sidecar_response(&mut self, maybe_sidecar: Option>>) { - match maybe_sidecar { - Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), - None => self.is_sidecar_rpc_finished = true, - } - } - - pub fn pop_response(&mut self) -> Option> { - if !self.accumulated_blocks.is_empty() && !self.accumulated_sidecars.is_empty() { - let beacon_block = self.accumulated_blocks.pop_front().expect("non empty"); - let blobs_sidecar = self.accumulated_sidecars.pop_front().expect("non empty"); - return Some(SignedBeaconBlockAndBlobsSidecar { - beacon_block, - blobs_sidecar, - }); - } - None - } - - pub fn is_finished(&self) -> bool { - self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished - } -} +use types::{BlobsSidecar, EthSpec, SignedBeaconBlock}; /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { @@ -104,6 +57,24 @@ pub struct SyncNetworkContext { log: slog::Logger, } +/// Small enumeration to make dealing with block and blob requests easier. +pub enum BlockOrBlob { + Block(Option>>), + Blob(Option>>), +} + +impl From>>> for BlockOrBlob { + fn from(block: Option>>) -> Self { + BlockOrBlob::Block(block) + } +} + +impl From>>> for BlockOrBlob { + fn from(blob: Option>>) -> Self { + BlockOrBlob::Blob(blob) + } +} + impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, @@ -300,91 +271,45 @@ impl SyncNetworkContext { } } - /// Received a blocks by range response. + /// Response for a request that is only for blocks. pub fn range_sync_block_response( &mut self, request_id: Id, - maybe_block: Option>>, - batch_type: ExpectedBatchTy, - ) -> Option<(ChainId, BatchId, Option>)> { - match batch_type { - ExpectedBatchTy::OnlyBlockBlobs => { - match self.range_sidecar_pair_requests.entry(request_id) { - Entry::Occupied(mut entry) => { - let (chain_id, batch_id, info) = entry.get_mut(); - let chain_id = chain_id.clone(); - let batch_id = batch_id.clone(); - let stream_terminator = maybe_block.is_none(); - info.add_block_response(maybe_block); - let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| { - BlockWrapper::BlockAndBlob { block_sidecar_pair } - }); - - if stream_terminator && !info.is_finished() { - return None; - } - if !stream_terminator && maybe_block_wrapped.is_none() { - return None; - } - - if info.is_finished() { - entry.remove(); - } - - Some((chain_id, batch_id, maybe_block_wrapped)) - } - Entry::Vacant(_) => None, - } - } - ExpectedBatchTy::OnlyBlock => { - // if the request is just for blocks then it can be removed on a stream termination - match maybe_block { - Some(block) => { - self.range_requests - .get(&request_id) - .cloned() - .map(|(chain_id, batch_id)| { - (chain_id, batch_id, Some(BlockWrapper::Block { block })) - }) - } - None => self - .range_requests - .remove(&request_id) - .map(|(chain_id, batch_id)| (chain_id, batch_id, None)), - } - } + is_stream_terminator: bool, + ) -> Option<(ChainId, BatchId)> { + if is_stream_terminator { + self.range_requests + .remove(&request_id) + .map(|(chain_id, batch_id)| (chain_id, batch_id)) + } else { + self.range_requests.get(&request_id).cloned() } } - pub fn range_sync_sidecar_response( + /// Received a blocks by range response for a request that couples blocks and blobs. + pub fn range_sync_block_and_blob_response( &mut self, request_id: Id, - maybe_sidecar: Option>>, - ) -> Option<(ChainId, BatchId, Option>)> { + block_or_blob: BlockOrBlob, + ) -> Option<( + ChainId, + BatchId, + Result>, &'static str>, + )> { match self.range_sidecar_pair_requests.entry(request_id) { Entry::Occupied(mut entry) => { - let (chain_id, batch_id, info) = entry.get_mut(); - let chain_id = chain_id.clone(); - let batch_id = batch_id.clone(); - let stream_terminator = maybe_sidecar.is_none(); - info.add_sidecar_response(maybe_sidecar); - let maybe_block = info - .pop_response() - .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair }); - - if stream_terminator && !info.is_finished() { - return None; + let (_, _, info) = entry.get_mut(); + match block_or_blob { + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } - - if !stream_terminator && maybe_block.is_none() { - return None; - } - if info.is_finished() { - entry.remove(); + // If the request is finished, unqueue everything + let (chain_id, batch_id, info) = entry.remove(); + Some((chain_id, batch_id, info.into_responses())) + } else { + None } - - Some((chain_id, batch_id, maybe_block)) } Entry::Vacant(_) => None, } @@ -418,65 +343,41 @@ impl SyncNetworkContext { } } - /// Received a blocks by range response. - pub fn backfill_sync_block_response( + /// Response for a request that is only for blocks. + pub fn backfill_sync_only_blocks_response( &mut self, request_id: Id, - maybe_block: Option>>, - batch_type: ExpectedBatchTy, - ) -> Option<(BatchId, Option>)> { - match batch_type { - ExpectedBatchTy::OnlyBlockBlobs => { - match self.backfill_sidecar_pair_requests.entry(request_id) { - Entry::Occupied(mut entry) => { - let (batch_id, info) = entry.get_mut(); - let batch_id = batch_id.clone(); - info.add_block_response(maybe_block); - let maybe_block = info.pop_response().map(|block_sidecar_pair| { - BlockWrapper::BlockAndBlob { block_sidecar_pair } - }); - if info.is_finished() { - entry.remove(); - } - Some((batch_id, maybe_block)) - } - Entry::Vacant(_) => None, - } - } - ExpectedBatchTy::OnlyBlock => { - // if the request is just for blocks then it can be removed on a stream termination - match maybe_block { - Some(block) => self - .backfill_requests - .get(&request_id) - .cloned() - .map(|batch_id| (batch_id, Some(BlockWrapper::Block { block }))), - None => self - .backfill_requests - .remove(&request_id) - .map(|batch_id| (batch_id, None)), - } - } + is_stream_terminator: bool, + ) -> Option { + if is_stream_terminator { + self.backfill_requests + .remove(&request_id) + .map(|batch_id| batch_id) + } else { + self.backfill_requests.get(&request_id).cloned() } } - pub fn backfill_sync_sidecar_response( + /// Received a blocks by range response for a request that couples blocks and blobs. + pub fn backfill_sync_block_and_blob_response( &mut self, request_id: Id, - maybe_sidecar: Option>>, - ) -> Option<(BatchId, Option>)> { + block_or_blob: BlockOrBlob, + ) -> Option<(BatchId, Result>, &'static str>)> { match self.backfill_sidecar_pair_requests.entry(request_id) { Entry::Occupied(mut entry) => { - let (batch_id, info) = entry.get_mut(); - let batch_id = batch_id.clone(); - info.add_sidecar_response(maybe_sidecar); - let maybe_block = info - .pop_response() - .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair }); - if info.is_finished() { - entry.remove(); + let (_, info) = entry.get_mut(); + match block_or_blob { + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + } + if info.is_finished() { + // If the request is finished, unqueue everything + let (batch_id, info) = entry.remove(); + Some((batch_id, info.into_responses())) + } else { + None } - Some((batch_id, maybe_block)) } Entry::Vacant(_) => None, } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 73e6f49eb..a4869f75b 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -686,13 +686,10 @@ mod tests { // add some peers let (peer1, local_info, head_info) = rig.head_peer(); range.add_peer(&mut rig.cx, local_info, peer1, head_info); - let ((chain1, batch1, _), id1) = match rig.grab_request(&peer1).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => ( - rig.cx - .range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock) - .unwrap(), - id, - ), + let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 { + RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { + (rig.cx.range_sync_response(id, true).unwrap(), id) + } other => panic!("unexpected request {:?}", other), }; @@ -708,13 +705,10 @@ mod tests { // while the ee is offline, more peers might arrive. Add a new finalized peer. let (peer2, local_info, finalized_info) = rig.finalized_peer(); range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); - let ((chain2, batch2, _), id2) = match rig.grab_request(&peer2).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => ( - rig.cx - .range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock) - .unwrap(), - id, - ), + let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 { + RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { + (rig.cx.range_sync_response(id, true).unwrap(), id) + } other => panic!("unexpected request {:?}", other), }; diff --git a/consensus/types/src/blobs_sidecar.rs b/consensus/types/src/blobs_sidecar.rs index 430936cc2..f1e2a4bb1 100644 --- a/consensus/types/src/blobs_sidecar.rs +++ b/consensus/types/src/blobs_sidecar.rs @@ -1,13 +1,17 @@ -use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot}; +use crate::test_utils::TestRandom; +use crate::{Blob, EthSpec, Hash256, SignedBeaconBlock, SignedRoot, Slot}; use kzg::KzgProof; use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; +use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default)] +#[derive( + Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default, TestRandom, +)] #[serde(bound = "T: EthSpec")] pub struct BlobsSidecar { pub beacon_block_root: Hash256, @@ -23,6 +27,7 @@ impl BlobsSidecar { pub fn empty() -> Self { Self::default() } + #[allow(clippy::integer_arithmetic)] pub fn max_size() -> usize { // Fixed part From 847f0de0ea20dcf8f27525b731102260828006c1 Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 22 Dec 2022 17:32:33 -0500 Subject: [PATCH 2/7] change base From fbc147e273d15b779587154d89d2691ff77c749a Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 22 Dec 2022 17:34:01 -0500 Subject: [PATCH 3/7] remove unused entry struct --- .../network/src/sync/block_sidecar_coupling.rs | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 762f37692..60c5a62ea 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{hash_map::OccupiedEntry, VecDeque}, - sync::Arc, -}; +use std::{collections::VecDeque, sync::Arc}; use types::{ signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock, @@ -25,18 +22,6 @@ pub struct BlockBlobRequestInfo { is_sidecar_rpc_finished: bool, } -pub struct BlockBlobRequestEntry<'a, K, T: EthSpec> { - entry: OccupiedEntry<'a, K, BlockBlobRequestInfo>, -} - -impl<'a, K, T: EthSpec> From>> - for BlockBlobRequestEntry<'a, K, T> -{ - fn from(entry: OccupiedEntry<'a, K, BlockBlobRequestInfo>) -> Self { - BlockBlobRequestEntry { entry } - } -} - impl BlockBlobRequestInfo { pub fn add_block_response(&mut self, maybe_block: Option>>) { match maybe_block { From e24f6c93d90387c7c17071add448fbdb1e307147 Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 22 Dec 2022 17:38:16 -0500 Subject: [PATCH 4/7] fix ctrl c'd comment --- beacon_node/network/src/sync/manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 155e2d213..f003ca27e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -827,10 +827,10 @@ impl SyncManager { } } Err(e) => { - // inform backfill that the request needs to be treated as failed + // inform range that the request needs to be treated as failed // With time we will want to downgrade this log warn!( - self.log, "Blocks and blobs request for backfill received invalid data"; + self.log, "Blocks and blobs request for range received invalid data"; "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e ); // TODO: penalize the peer for being a bad boy From 48ff56d9cb0173b4a61b6727017b5059b59275c6 Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 22 Dec 2022 17:38:55 -0500 Subject: [PATCH 5/7] spelling --- beacon_node/network/src/sync/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f003ca27e..305f9f706 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -841,7 +841,7 @@ impl SyncManager { } } - /// Handles receiving a response for a bacjfill sync request that should have both blocks and + /// Handles receiving a response for a Backfill sync request that should have both blocks and /// blobs. fn block_blob_backfill_response( &mut self, From 3643f5cc19fe02d661ab08a61289113b6901211e Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 22 Dec 2022 17:47:36 -0500 Subject: [PATCH 6/7] spelling --- beacon_node/network/src/sync/network_context.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 912a5620e..8e651e897 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -304,7 +304,7 @@ impl SyncNetworkContext { BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { - // If the request is finished, unqueue everything + // If the request is finished, dequeue everything let (chain_id, batch_id, info) = entry.remove(); Some((chain_id, batch_id, info.into_responses())) } else { @@ -372,7 +372,7 @@ impl SyncNetworkContext { BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { - // If the request is finished, unqueue everything + // If the request is finished, dequeue everything let (batch_id, info) = entry.remove(); Some((batch_id, info.into_responses())) } else { From 66f9aa922d5f049deb1eec8a7877b9aaba163224 Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 23 Dec 2022 09:52:10 -0500 Subject: [PATCH 7/7] clean up and improvements --- .../src/sync/block_sidecar_coupling.rs | 87 +++++++------------ .../network/src/sync/network_context.rs | 8 +- 2 files changed, 36 insertions(+), 59 deletions(-) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 60c5a62ea..95acadffb 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -5,11 +5,6 @@ use types::{ SignedBeaconBlockAndBlobsSidecar, }; -struct ReceivedData { - block: Option>>, - blob: Option>>, -} - #[derive(Debug, Default)] pub struct BlockBlobRequestInfo { /// Blocks we have received awaiting for their corresponding sidecar. @@ -17,84 +12,68 @@ pub struct BlockBlobRequestInfo { /// Sidecars we have received awaiting for their corresponding block. accumulated_sidecars: VecDeque>>, /// Whether the individual RPC request for blocks is finished or not. - is_blocks_rpc_finished: bool, + is_blocks_stream_terminated: bool, /// Whether the individual RPC request for sidecars is finished or not. - is_sidecar_rpc_finished: bool, + is_sidecars_stream_terminated: bool, } impl BlockBlobRequestInfo { pub fn add_block_response(&mut self, maybe_block: Option>>) { match maybe_block { Some(block) => self.accumulated_blocks.push_back(block), - None => self.is_blocks_rpc_finished = true, + None => self.is_blocks_stream_terminated = true, } } pub fn add_sidecar_response(&mut self, maybe_sidecar: Option>>) { match maybe_sidecar { Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), - None => self.is_sidecar_rpc_finished = true, + None => self.is_sidecars_stream_terminated = true, } } pub fn into_responses(self) -> Result>, &'static str> { let BlockBlobRequestInfo { accumulated_blocks, - accumulated_sidecars, + mut accumulated_sidecars, .. } = self; - // Create the storage for our pairs. - let mut pairs = Vec::with_capacity(accumulated_blocks.len()); - - // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any block (empty + // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. - for sidecar in accumulated_sidecars { - let blob_slot = sidecar.beacon_block_slot; - // First queue any blocks that might not have a blob. - while let Some(block) = { - // We identify those if their slot is less than the current blob's slot. - match accumulated_blocks.front() { - Some(borrowed_block) if borrowed_block.slot() < blob_slot => { - accumulated_blocks.pop_front() - } - Some(_) => None, - None => { - // We received a blob and ran out of blocks. This is a peer error - return Err("Blob without more blobs to pair with returned by peer"); - } + let pairs = accumulated_blocks + .into_iter() + .map(|beacon_block| { + if accumulated_sidecars + .front() + .map(|sidecar| sidecar.beacon_block_slot == beacon_block.slot()) + .unwrap_or(false) + { + let blobs_sidecar = + accumulated_sidecars.pop_front().ok_or("missing sidecar")?; + Ok(BlockWrapper::BlockAndBlob { + block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar { + beacon_block, + blobs_sidecar, + }, + }) + } else { + Ok(BlockWrapper::Block { + block: beacon_block, + }) } - } { - pairs.push(BlockWrapper::Block { block }) - } + }) + .collect::, _>>(); - // The next block must be present and must match the blob's slot - let next_block = accumulated_blocks - .pop_front() - .expect("If block stream ended, an error was previously returned"); - if next_block.slot() != blob_slot { - // We verified that the slot of the block is not less than the slot of the blob (it - // would have been returned before). It's also not equal, so this block is ahead - // than the blob. This means the blob is not paired. - return Err("Blob without a matching block returned by peer"); - } - pairs.push(BlockWrapper::BlockAndBlob { - block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar { - beacon_block: next_block, - blobs_sidecar: sidecar, - }, - }); + // if accumulated sidecars is not empty, throw an error. + if !accumulated_sidecars.is_empty() { + return Err("Received more sidecars than blocks"); } - // Every remaining block does not have a blob - for block in accumulated_blocks { - pairs.push(BlockWrapper::Block { block }) - } - - Ok(pairs) + pairs } pub fn is_finished(&self) -> bool { - self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished + self.is_blocks_stream_terminated && self.is_sidecars_stream_terminated } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 8e651e897..cc94db77c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -278,11 +278,9 @@ impl SyncNetworkContext { is_stream_terminator: bool, ) -> Option<(ChainId, BatchId)> { if is_stream_terminator { - self.range_requests - .remove(&request_id) - .map(|(chain_id, batch_id)| (chain_id, batch_id)) + self.range_requests.remove(&request_id) } else { - self.range_requests.get(&request_id).cloned() + self.range_requests.get(&request_id).copied() } } @@ -354,7 +352,7 @@ impl SyncNetworkContext { .remove(&request_id) .map(|batch_id| batch_id) } else { - self.backfill_requests.get(&request_id).cloned() + self.backfill_requests.get(&request_id).copied() } }