clean up and improvements

This commit is contained in:
Diva M 2022-12-23 09:52:10 -05:00
parent 3643f5cc19
commit 66f9aa922d
No known key found for this signature in database
GPG Key ID: 1BAE5E01126680FE
2 changed files with 36 additions and 59 deletions

View File

@ -5,11 +5,6 @@ use types::{
SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockAndBlobsSidecar,
}; };
struct ReceivedData<T: EthSpec> {
block: Option<Arc<SignedBeaconBlock<T>>>,
blob: Option<Arc<BlobsSidecar<T>>>,
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct BlockBlobRequestInfo<T: EthSpec> { pub struct BlockBlobRequestInfo<T: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar. /// Blocks we have received awaiting for their corresponding sidecar.
@ -17,84 +12,68 @@ pub struct BlockBlobRequestInfo<T: EthSpec> {
/// Sidecars we have received awaiting for their corresponding block. /// Sidecars we have received awaiting for their corresponding block.
accumulated_sidecars: VecDeque<Arc<BlobsSidecar<T>>>, accumulated_sidecars: VecDeque<Arc<BlobsSidecar<T>>>,
/// Whether the individual RPC request for blocks is finished or not. /// Whether the individual RPC request for blocks is finished or not.
is_blocks_rpc_finished: bool, is_blocks_stream_terminated: bool,
/// Whether the individual RPC request for sidecars is finished or not. /// Whether the individual RPC request for sidecars is finished or not.
is_sidecar_rpc_finished: bool, is_sidecars_stream_terminated: bool,
} }
impl<T: EthSpec> BlockBlobRequestInfo<T> { impl<T: EthSpec> BlockBlobRequestInfo<T> {
pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) { pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) {
match maybe_block { match maybe_block {
Some(block) => self.accumulated_blocks.push_back(block), Some(block) => self.accumulated_blocks.push_back(block),
None => self.is_blocks_rpc_finished = true, None => self.is_blocks_stream_terminated = true,
} }
} }
pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobsSidecar<T>>>) { pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobsSidecar<T>>>) {
match maybe_sidecar { match maybe_sidecar {
Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), Some(sidecar) => self.accumulated_sidecars.push_back(sidecar),
None => self.is_sidecar_rpc_finished = true, None => self.is_sidecars_stream_terminated = true,
} }
} }
pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> { pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> {
let BlockBlobRequestInfo { let BlockBlobRequestInfo {
accumulated_blocks, accumulated_blocks,
accumulated_sidecars, mut accumulated_sidecars,
.. ..
} = self; } = self;
// Create the storage for our pairs. // ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty
let mut pairs = Vec::with_capacity(accumulated_blocks.len());
// ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any block (empty
// included) for a skipped slot is not permitted. // included) for a skipped slot is not permitted.
for sidecar in accumulated_sidecars { let pairs = accumulated_blocks
let blob_slot = sidecar.beacon_block_slot; .into_iter()
// First queue any blocks that might not have a blob. .map(|beacon_block| {
while let Some(block) = { if accumulated_sidecars
// We identify those if their slot is less than the current blob's slot. .front()
match accumulated_blocks.front() { .map(|sidecar| sidecar.beacon_block_slot == beacon_block.slot())
Some(borrowed_block) if borrowed_block.slot() < blob_slot => { .unwrap_or(false)
accumulated_blocks.pop_front() {
} let blobs_sidecar =
Some(_) => None, accumulated_sidecars.pop_front().ok_or("missing sidecar")?;
None => { Ok(BlockWrapper::BlockAndBlob {
// We received a blob and ran out of blocks. This is a peer error
return Err("Blob without more blobs to pair with returned by peer");
}
}
} {
pairs.push(BlockWrapper::Block { block })
}
// The next block must be present and must match the blob's slot
let next_block = accumulated_blocks
.pop_front()
.expect("If block stream ended, an error was previously returned");
if next_block.slot() != blob_slot {
// We verified that the slot of the block is not less than the slot of the blob (it
// would have been returned before). It's also not equal, so this block is ahead
// than the blob. This means the blob is not paired.
return Err("Blob without a matching block returned by peer");
}
pairs.push(BlockWrapper::BlockAndBlob {
block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar { block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar {
beacon_block: next_block, beacon_block,
blobs_sidecar: sidecar, blobs_sidecar,
}, },
}); })
} else {
Ok(BlockWrapper::Block {
block: beacon_block,
})
}
})
.collect::<Result<Vec<_>, _>>();
// if accumulated sidecars is not empty, throw an error.
if !accumulated_sidecars.is_empty() {
return Err("Received more sidecars than blocks");
} }
// Every remaining block does not have a blob pairs
for block in accumulated_blocks {
pairs.push(BlockWrapper::Block { block })
}
Ok(pairs)
} }
pub fn is_finished(&self) -> bool { pub fn is_finished(&self) -> bool {
self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished self.is_blocks_stream_terminated && self.is_sidecars_stream_terminated
} }
} }

View File

@ -278,11 +278,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
is_stream_terminator: bool, is_stream_terminator: bool,
) -> Option<(ChainId, BatchId)> { ) -> Option<(ChainId, BatchId)> {
if is_stream_terminator { if is_stream_terminator {
self.range_requests self.range_requests.remove(&request_id)
.remove(&request_id)
.map(|(chain_id, batch_id)| (chain_id, batch_id))
} else { } else {
self.range_requests.get(&request_id).cloned() self.range_requests.get(&request_id).copied()
} }
} }
@ -354,7 +352,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.remove(&request_id) .remove(&request_id)
.map(|batch_id| batch_id) .map(|batch_id| batch_id)
} else { } else {
self.backfill_requests.get(&request_id).cloned() self.backfill_requests.get(&request_id).copied()
} }
} }