Fix block and blob coupling in the network context (#4086)

* update docs

* introduce a temp enum to model an adjusted `BlockWrapper` and fix blob coupling

* fix compilation issue

* fix blob coupling in the network context

* review comments
This commit is contained in:
Divma 2023-03-15 11:04:45 -05:00 committed by GitHub
parent 2ef3ebbef3
commit 2c9477de43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 42 deletions

View File

@ -1,14 +1,13 @@
use beacon_chain::blob_verification::BlockWrapper; use super::network_context::TempBlockWrapper;
use std::{collections::VecDeque, sync::Arc}; use std::{collections::VecDeque, sync::Arc};
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
use types::{BlobsSidecar, EthSpec, SignedBeaconBlock};
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct BlocksAndBlobsRequestInfo<T: EthSpec> { pub struct BlocksAndBlobsRequestInfo<T: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar. /// Blocks we have received awaiting for their corresponding sidecar.
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>, accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>,
/// Sidecars we have received awaiting for their corresponding block. /// Sidecars we have received awaiting for their corresponding block.
accumulated_sidecars: VecDeque<Arc<BlobsSidecar<T>>>, accumulated_sidecars: VecDeque<Arc<BlobSidecar<T>>>,
/// Whether the individual RPC request for blocks is finished or not. /// Whether the individual RPC request for blocks is finished or not.
is_blocks_stream_terminated: bool, is_blocks_stream_terminated: bool,
/// Whether the individual RPC request for sidecars is finished or not. /// Whether the individual RPC request for sidecars is finished or not.
@ -23,14 +22,14 @@ impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
} }
} }
pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobsSidecar<T>>>) { pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobSidecar<T>>>) {
match maybe_sidecar { match maybe_sidecar {
Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), Some(sidecar) => self.accumulated_sidecars.push_back(sidecar),
None => self.is_sidecars_stream_terminated = true, None => self.is_sidecars_stream_terminated = true,
} }
} }
pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> { pub fn into_responses(self) -> Result<Vec<TempBlockWrapper<T>>, &'static str> {
let BlocksAndBlobsRequestInfo { let BlocksAndBlobsRequestInfo {
accumulated_blocks, accumulated_blocks,
mut accumulated_sidecars, mut accumulated_sidecars,
@ -39,28 +38,33 @@ impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
// ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty
// included) for a skipped slot is not permitted. // included) for a skipped slot is not permitted.
let pairs = accumulated_blocks let mut responses = Vec::with_capacity(accumulated_blocks.len());
.into_iter() let mut blob_iter = accumulated_sidecars.into_iter().peekable();
.map(|beacon_block| { for block in accumulated_blocks.into_iter() {
if accumulated_sidecars let mut blob_list = Vec::with_capacity(T::max_blobs_per_block());
.front() while {
.map(|sidecar| sidecar.beacon_block_slot == beacon_block.slot()) let pair_next_blob = blob_iter
.unwrap_or(false) .peek()
{ .map(|sidecar| sidecar.slot == block.slot())
let blobs_sidecar = accumulated_sidecars.pop_front(); .unwrap_or(false);
BlockWrapper::new(beacon_block, blobs_sidecar) pair_next_blob
} else { } {
BlockWrapper::new(beacon_block, None) blob_list.push(blob_iter.next().expect("iterator is not empty"));
} }
})
.collect::<Vec<_>>();
// if accumulated sidecars is not empty, throw an error. if blob_list.is_empty() {
if !accumulated_sidecars.is_empty() { responses.push(TempBlockWrapper::Block(block))
return Err("Received more sidecars than blocks"); } else {
responses.push(TempBlockWrapper::BlockAndBlobList(block, blob_list))
}
} }
Ok(pairs) // if accumulated sidecars is not empty, throw an error.
if blob_iter.next().is_some() {
return Err("Received sidecars that don't pair well");
}
Ok(responses)
} }
pub fn is_finished(&self) -> bool { pub fn is_finished(&self) -> bool {

View File

@ -78,11 +78,11 @@ pub enum RequestId {
ParentLookup { id: Id }, ParentLookup { id: Id },
/// Request was from the backfill sync algorithm. /// Request was from the backfill sync algorithm.
BackFillBlocks { id: Id }, BackFillBlocks { id: Id },
/// Backfill request for blob sidecars. /// Backfill request that is composed by both a block range request and a blob range request.
BackFillBlobs { id: Id }, BackFillBlobs { id: Id },
/// The request was from a chain in the range sync algorithm. /// The request was from a chain in the range sync algorithm.
RangeBlocks { id: Id }, RangeBlocks { id: Id },
/// The request was from a chain in range, asking for ranges blob sidecars. /// Range request that is composed by both a block range request and a blob range request.
RangeBlobs { id: Id }, RangeBlobs { id: Id },
} }

View File

@ -18,7 +18,13 @@ use slog::{debug, trace, warn};
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::{BlobsSidecar, EthSpec, SignedBeaconBlock}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
// Temporary struct to handle incremental changes in the meantime.
pub enum TempBlockWrapper<T: EthSpec> {
Block(Arc<SignedBeaconBlock<T>>),
BlockAndBlobList(Arc<SignedBeaconBlock<T>>, Vec<Arc<BlobSidecar<T>>>),
}
pub struct BlocksAndBlobsByRangeResponse<T: EthSpec> { pub struct BlocksAndBlobsByRangeResponse<T: EthSpec> {
pub batch_id: BatchId, pub batch_id: BatchId,
@ -71,7 +77,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// Small enumeration to make dealing with block and blob requests easier. /// Small enumeration to make dealing with block and blob requests easier.
pub enum BlockOrBlobs<T: EthSpec> { pub enum BlockOrBlobs<T: EthSpec> {
Block(Option<Arc<SignedBeaconBlock<T>>>), Block(Option<Arc<SignedBeaconBlock<T>>>),
Blobs(Option<Arc<BlobsSidecar<T>>>), Blobs(Option<Arc<BlobSidecar<T>>>),
} }
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlobs<T> { impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlobs<T> {
@ -80,8 +86,8 @@ impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlobs<T> {
} }
} }
impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlobs<T> { impl<T: EthSpec> From<Option<Arc<BlobSidecar<T>>>> for BlockOrBlobs<T> {
fn from(blob: Option<Arc<BlobsSidecar<T>>>) -> Self { fn from(blob: Option<Arc<BlobSidecar<T>>>) -> Self {
BlockOrBlobs::Blobs(blob) BlockOrBlobs::Blobs(blob)
} }
} }
@ -323,13 +329,25 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
block_blob_info, block_blob_info,
} = entry.remove(); } = entry.remove();
Some(( let responses = block_blob_info.into_responses();
chain_id, let unimplemented_info = match responses {
BlocksAndBlobsByRangeResponse { Ok(responses) => {
batch_id, let infos = responses
responses: block_blob_info.into_responses(), .into_iter()
}, .map(|temp_block_wrapper| match temp_block_wrapper {
)) TempBlockWrapper::Block(block) => {
format!("slot{}", block.slot())
}
TempBlockWrapper::BlockAndBlobList(block, blob_list) => {
format!("slot{}({} blobs)", block.slot(), blob_list.len())
}
})
.collect::<Vec<_>>();
infos.join(", ")
}
Err(e) => format!("Error: {e}"),
};
unimplemented!("Here we are supposed to return a block possibly paired with a Bundle of blobs, but only have a list of individual blobs. This is what we got from the network: ChainId[{chain_id}] BatchId[{batch_id}] {unimplemented_info}")
} else { } else {
None None
} }
@ -396,10 +414,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
if info.is_finished() { if info.is_finished() {
// If the request is finished, dequeue everything // If the request is finished, dequeue everything
let (batch_id, info) = entry.remove(); let (batch_id, info) = entry.remove();
Some(BlocksAndBlobsByRangeResponse {
batch_id, let responses = info.into_responses();
responses: info.into_responses(), let unimplemented_info = match responses {
}) Ok(responses) => {
let infos = responses
.into_iter()
.map(|temp_block_wrapper| match temp_block_wrapper {
TempBlockWrapper::Block(block) => {
format!("slot{}", block.slot())
}
TempBlockWrapper::BlockAndBlobList(block, blob_list) => {
format!("slot{}({} blobs)", block.slot(), blob_list.len())
}
})
.collect::<Vec<_>>();
infos.join(", ")
}
Err(e) => format!("Error: {e}"),
};
unimplemented!("Here we are supposed to return a block possibly paired with a Bundle of blobs for backfill, but only have a list of individual blobs. This is what we got from the network: BatchId[{batch_id}]{unimplemented_info}")
} else { } else {
None None
} }