diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 60c5a62ea..95acadffb 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -5,11 +5,6 @@ use types::{ SignedBeaconBlockAndBlobsSidecar, }; -struct ReceivedData { - block: Option>>, - blob: Option>>, -} - #[derive(Debug, Default)] pub struct BlockBlobRequestInfo { /// Blocks we have received awaiting for their corresponding sidecar. @@ -17,84 +12,68 @@ pub struct BlockBlobRequestInfo { /// Sidecars we have received awaiting for their corresponding block. accumulated_sidecars: VecDeque>>, /// Whether the individual RPC request for blocks is finished or not. - is_blocks_rpc_finished: bool, + is_blocks_stream_terminated: bool, /// Whether the individual RPC request for sidecars is finished or not. - is_sidecar_rpc_finished: bool, + is_sidecars_stream_terminated: bool, } impl BlockBlobRequestInfo { pub fn add_block_response(&mut self, maybe_block: Option>>) { match maybe_block { Some(block) => self.accumulated_blocks.push_back(block), - None => self.is_blocks_rpc_finished = true, + None => self.is_blocks_stream_terminated = true, } } pub fn add_sidecar_response(&mut self, maybe_sidecar: Option>>) { match maybe_sidecar { Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), - None => self.is_sidecar_rpc_finished = true, + None => self.is_sidecars_stream_terminated = true, } } pub fn into_responses(self) -> Result>, &'static str> { let BlockBlobRequestInfo { accumulated_blocks, - accumulated_sidecars, + mut accumulated_sidecars, .. } = self; - // Create the storage for our pairs. - let mut pairs = Vec::with_capacity(accumulated_blocks.len()); - - // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any block (empty + // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. - for sidecar in accumulated_sidecars { - let blob_slot = sidecar.beacon_block_slot; - // First queue any blocks that might not have a blob. - while let Some(block) = { - // We identify those if their slot is less than the current blob's slot. - match accumulated_blocks.front() { - Some(borrowed_block) if borrowed_block.slot() < blob_slot => { - accumulated_blocks.pop_front() - } - Some(_) => None, - None => { - // We received a blob and ran out of blocks. This is a peer error - return Err("Blob without more blobs to pair with returned by peer"); - } + let pairs = accumulated_blocks + .into_iter() + .map(|beacon_block| { + if accumulated_sidecars + .front() + .map(|sidecar| sidecar.beacon_block_slot == beacon_block.slot()) + .unwrap_or(false) + { + let blobs_sidecar = + accumulated_sidecars.pop_front().ok_or("missing sidecar")?; + Ok(BlockWrapper::BlockAndBlob { + block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar { + beacon_block, + blobs_sidecar, + }, + }) + } else { + Ok(BlockWrapper::Block { + block: beacon_block, + }) } - } { - pairs.push(BlockWrapper::Block { block }) - } + }) + .collect::, _>>(); - // The next block must be present and must match the blob's slot - let next_block = accumulated_blocks - .pop_front() - .expect("If block stream ended, an error was previously returned"); - if next_block.slot() != blob_slot { - // We verified that the slot of the block is not less than the slot of the blob (it - // would have been returned before). It's also not equal, so this block is ahead - // than the blob. This means the blob is not paired. - return Err("Blob without a matching block returned by peer"); - } - pairs.push(BlockWrapper::BlockAndBlob { - block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar { - beacon_block: next_block, - blobs_sidecar: sidecar, - }, - }); + // if accumulated sidecars is not empty, throw an error. + if !accumulated_sidecars.is_empty() { + return Err("Received more sidecars than blocks"); } - // Every remaining block does not have a blob - for block in accumulated_blocks { - pairs.push(BlockWrapper::Block { block }) - } - - Ok(pairs) + pairs } pub fn is_finished(&self) -> bool { - self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished + self.is_blocks_stream_terminated && self.is_sidecars_stream_terminated } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 8e651e897..cc94db77c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -278,11 +278,9 @@ impl SyncNetworkContext { is_stream_terminator: bool, ) -> Option<(ChainId, BatchId)> { if is_stream_terminator { - self.range_requests - .remove(&request_id) - .map(|(chain_id, batch_id)| (chain_id, batch_id)) + self.range_requests.remove(&request_id) } else { - self.range_requests.get(&request_id).cloned() + self.range_requests.get(&request_id).copied() } } @@ -354,7 +352,7 @@ impl SyncNetworkContext { .remove(&request_id) .map(|batch_id| batch_id) } else { - self.backfill_requests.get(&request_id).cloned() + self.backfill_requests.get(&request_id).copied() } }