renames, remove , wrap BlockWrapper enum to make descontruction private

This commit is contained in:
realbigsean 2022-12-28 10:28:45 -05:00
parent 502b5e5bf0
commit 5b3b34a9d7
No known key found for this signature in database
GPG Key ID: B372B64D866BF8CC
19 changed files with 231 additions and 268 deletions

View File

@ -1141,7 +1141,7 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock
let block_root = check_block_relevancy(&self, block_root, chain) let block_root = check_block_relevancy(&self, block_root, chain)
.map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?;
SignatureVerifiedBlock::check_slashable(BlockWrapper::Block(self), block_root, chain)? SignatureVerifiedBlock::check_slashable(self.into(), block_root, chain)?
.into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer)
} }

View File

@ -32,27 +32,28 @@ pub async fn publish_block<T: BeaconChainTypes>(
// Send the block, regardless of whether or not it is valid. The API // Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour. // specification is very clear that this is the desired behaviour.
let wrapped_block = if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) { let wrapped_block: BlockWrapper<T::EthSpec> =
if let Some(sidecar) = chain.blob_cache.pop(&block_root) { if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) {
let block_and_blobs = SignedBeaconBlockAndBlobsSidecar { if let Some(sidecar) = chain.blob_cache.pop(&block_root) {
beacon_block: block, let block_and_blobs = SignedBeaconBlockAndBlobsSidecar {
blobs_sidecar: Arc::new(sidecar), beacon_block: block,
}; blobs_sidecar: Arc::new(sidecar),
crate::publish_pubsub_message( };
network_tx, crate::publish_pubsub_message(
PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()), network_tx,
)?; PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()),
BlockWrapper::BlockAndBlob(block_and_blobs) )?;
block_and_blobs.into()
} else {
//FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required
return Err(warp_utils::reject::broadcast_without_import(format!(
"no blob cached for block"
)));
}
} else { } else {
//FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?;
return Err(warp_utils::reject::broadcast_without_import(format!( block.into()
"no blob cached for block" };
)));
}
} else {
crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?;
BlockWrapper::Block(block)
};
// Determine the delay after the start of the slot, register it with metrics. // Determine the delay after the start of the slot, register it with metrics.
let block = wrapped_block.block(); let block = wrapped_block.block();

View File

@ -73,7 +73,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlockAndBlobsByRoot(res) => res.as_ssz_bytes(),
RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(),
RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => RPCResponse::MetaData(res) =>
@ -439,7 +439,8 @@ fn context_bytes<T: EthSpec>(
SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()), SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()),
}; };
} }
if let RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) = rpc_variant { if let RPCResponse::BlobsByRange(_) | RPCResponse::BlockAndBlobsByRoot(_) = rpc_variant
{
return fork_context.to_context_bytes(ForkName::Eip4844); return fork_context.to_context_bytes(ForkName::Eip4844);
} }
} }
@ -585,7 +586,7 @@ fn handle_v1_response<T: EthSpec>(
)))), )))),
_ => Err(RPCError::ErrorResponse( _ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest, RPCResponseErrorCode::InvalidRequest,
"Invalid forkname for blobsbyrange".to_string(), "Invalid fork name for blobs by range".to_string(),
)), )),
} }
} }
@ -597,12 +598,12 @@ fn handle_v1_response<T: EthSpec>(
) )
})?; })?;
match fork_name { match fork_name {
ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRoot(Arc::new( ForkName::Eip4844 => Ok(Some(RPCResponse::BlockAndBlobsByRoot(
SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?, SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?,
)))), ))),
_ => Err(RPCError::ErrorResponse( _ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest, RPCResponseErrorCode::InvalidRequest,
"Invalid forkname for blobsbyroot".to_string(), "Invalid fork name for block and blobs by root".to_string(),
)), )),
} }
} }

View File

@ -281,7 +281,7 @@ pub enum RPCResponse<T: EthSpec> {
LightClientBootstrap(LightClientBootstrap<T>), LightClientBootstrap(LightClientBootstrap<T>),
/// A response to a get BLOBS_BY_ROOT request. /// A response to a get BLOBS_BY_ROOT request.
BlobsByRoot(Arc<SignedBeaconBlockAndBlobsSidecar<T>>), BlockAndBlobsByRoot(SignedBeaconBlockAndBlobsSidecar<T>),
/// A PONG response to a PING request. /// A PONG response to a PING request.
Pong(Ping), Pong(Ping),
@ -372,7 +372,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRange(_) => true,
RPCResponse::BlocksByRoot(_) => true, RPCResponse::BlocksByRoot(_) => true,
RPCResponse::BlobsByRange(_) => true, RPCResponse::BlobsByRange(_) => true,
RPCResponse::BlobsByRoot(_) => true, RPCResponse::BlockAndBlobsByRoot(_) => true,
RPCResponse::Pong(_) => false, RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false, RPCResponse::MetaData(_) => false,
RPCResponse::LightClientBootstrap(_) => false, RPCResponse::LightClientBootstrap(_) => false,
@ -409,7 +409,7 @@ impl<T: EthSpec> RPCResponse<T> {
RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange, RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange,
RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot,
RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange, RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange,
RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, RPCResponse::BlockAndBlobsByRoot(_) => Protocol::BlobsByRoot,
RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::Pong(_) => Protocol::Ping,
RPCResponse::MetaData(_) => Protocol::MetaData, RPCResponse::MetaData(_) => Protocol::MetaData,
RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap,
@ -449,7 +449,7 @@ impl<T: EthSpec> std::fmt::Display for RPCResponse<T> {
RPCResponse::BlobsByRange(blob) => { RPCResponse::BlobsByRange(blob) => {
write!(f, "BlobsByRange: Blob slot: {}", blob.beacon_block_slot) write!(f, "BlobsByRange: Blob slot: {}", blob.beacon_block_slot)
} }
RPCResponse::BlobsByRoot(blob) => { RPCResponse::BlockAndBlobsByRoot(blob) => {
write!( write!(
f, f,
"BlobsByRoot: Blob slot: {}", "BlobsByRoot: Blob slot: {}",

View File

@ -83,7 +83,7 @@ pub enum Response<TSpec: EthSpec> {
/// A response to a LightClientUpdate request. /// A response to a LightClientUpdate request.
LightClientBootstrap(LightClientBootstrap<TSpec>), LightClientBootstrap(LightClientBootstrap<TSpec>),
/// A response to a get BLOBS_BY_ROOT request. /// A response to a get BLOBS_BY_ROOT request.
BlobsByRoot(Option<Arc<SignedBeaconBlockAndBlobsSidecar<TSpec>>>), BlobsByRoot(Option<SignedBeaconBlockAndBlobsSidecar<TSpec>>),
} }
impl<TSpec: EthSpec> std::convert::From<Response<TSpec>> for RPCCodedResponse<TSpec> { impl<TSpec: EthSpec> std::convert::From<Response<TSpec>> for RPCCodedResponse<TSpec> {
@ -98,7 +98,7 @@ impl<TSpec: EthSpec> std::convert::From<Response<TSpec>> for RPCCodedResponse<TS
None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange),
}, },
Response::BlobsByRoot(r) => match r { Response::BlobsByRoot(r) => match r {
Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRoot(b)), Some(b) => RPCCodedResponse::Success(RPCResponse::BlockAndBlobsByRoot(b)),
None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRoot), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRoot),
}, },
Response::BlobsByRange(r) => match r { Response::BlobsByRange(r) => match r {

View File

@ -1315,7 +1315,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
RPCResponse::BlocksByRoot(resp) => { RPCResponse::BlocksByRoot(resp) => {
self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp)))
} }
RPCResponse::BlobsByRoot(resp) => { RPCResponse::BlockAndBlobsByRoot(resp) => {
self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp)))
} }
// Should never be reached // Should never be reached

View File

@ -1699,7 +1699,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
message_id, message_id,
peer_id, peer_id,
peer_client, peer_client,
BlockWrapper::Block(block), block.into(),
work_reprocessing_tx, work_reprocessing_tx,
duplicate_cache, duplicate_cache,
seen_timestamp, seen_timestamp,
@ -1721,7 +1721,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
message_id, message_id,
peer_id, peer_id,
peer_client, peer_client,
BlockWrapper::BlockAndBlob(block_sidecar_pair), block_sidecar_pair.into(),
work_reprocessing_tx, work_reprocessing_tx,
duplicate_cache, duplicate_cache,
seen_timestamp, seen_timestamp,

View File

@ -230,10 +230,10 @@ impl<T: BeaconChainTypes> Worker<T> {
Ok((Some(block), Some(blobs))) => { Ok((Some(block), Some(blobs))) => {
self.send_response( self.send_response(
peer_id, peer_id,
Response::BlobsByRoot(Some(Arc::new(SignedBeaconBlockAndBlobsSidecar { Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block, beacon_block: block,
blobs_sidecar: blobs, blobs_sidecar: blobs,
}))), })),
request_id, request_id,
); );
send_block_count += 1; send_block_count += 1;

View File

@ -188,14 +188,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
let sent_blocks = downloaded_blocks.len(); let sent_blocks = downloaded_blocks.len();
let unwrapped = downloaded_blocks let unwrapped = downloaded_blocks.into_iter().map(|_| todo!()).collect();
.into_iter()
.map(|block| match block {
BlockWrapper::Block(block) => block,
//FIXME(sean) handle blobs in backfill
BlockWrapper::BlockAndBlob(_) => todo!(),
})
.collect();
match self.process_backfill_blocks(unwrapped) { match self.process_backfill_blocks(unwrapped) {
(_, Ok(_)) => { (_, Ok(_)) => {

View File

@ -223,10 +223,10 @@ impl<T: BeaconChainTypes> Processor<T> {
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
unreachable!("Block lookups do not request BBRange requests") unreachable!("Block lookups do not request BBRange requests")
} }
id @ (SyncId::BackFillSync { .. } id @ (SyncId::BackFillBlocks { .. }
| SyncId::RangeSync { .. } | SyncId::RangeBlocks { .. }
| SyncId::BackFillSidecarPair { .. } | SyncId::BackFillBlobs { .. }
| SyncId::RangeSidecarPair { .. }) => id, | SyncId::RangeBlobs { .. }) => id,
}, },
RequestId::Router => unreachable!("All BBRange requests belong to sync"), RequestId::Router => unreachable!("All BBRange requests belong to sync"),
}; };
@ -258,7 +258,7 @@ impl<T: BeaconChainTypes> Processor<T> {
); );
if let RequestId::Sync(id) = request_id { if let RequestId::Sync(id) = request_id {
self.send_to_sync(SyncMessage::RpcGlob { self.send_to_sync(SyncMessage::RpcBlobs {
peer_id, peer_id,
request_id: id, request_id: id,
blob_sidecar, blob_sidecar,
@ -282,10 +282,10 @@ impl<T: BeaconChainTypes> Processor<T> {
let request_id = match request_id { let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id { RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillSync { .. } SyncId::BackFillBlocks { .. }
| SyncId::RangeSync { .. } | SyncId::RangeBlocks { .. }
| SyncId::RangeSidecarPair { .. } | SyncId::RangeBlobs { .. }
| SyncId::BackFillSidecarPair { .. } => { | SyncId::BackFillBlobs { .. } => {
unreachable!("Batch syncing do not request BBRoot requests") unreachable!("Batch syncing do not request BBRoot requests")
} }
}, },
@ -310,15 +310,15 @@ impl<T: BeaconChainTypes> Processor<T> {
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
block_and_blobs: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>>, block_and_blobs: Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
) { ) {
let request_id = match request_id { let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id { RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillSync { .. } SyncId::BackFillBlocks { .. }
| SyncId::RangeSync { .. } | SyncId::RangeBlocks { .. }
| SyncId::RangeSidecarPair { .. } | SyncId::RangeBlobs { .. }
| SyncId::BackFillSidecarPair { .. } => { | SyncId::BackFillBlobs { .. } => {
unreachable!("Batch syncing does not request BBRoot requests") unreachable!("Batch syncing does not request BBRoot requests")
} }
}, },
@ -330,7 +330,7 @@ impl<T: BeaconChainTypes> Processor<T> {
"Received BlockAndBlobssByRoot Response"; "Received BlockAndBlobssByRoot Response";
"peer" => %peer_id, "peer" => %peer_id,
); );
self.send_to_sync(SyncMessage::RpcBlockAndGlob { self.send_to_sync(SyncMessage::RpcBlockAndBlobs {
peer_id, peer_id,
request_id, request_id,
block_and_blobs, block_and_blobs,

View File

@ -536,7 +536,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id);
self.current_processing_batch = Some(batch_id); self.current_processing_batch = Some(batch_id);
let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks()); let work_event = BeaconWorkEvent::chain_segment(process_id, blocks);
if let Err(e) = network.processor_channel().try_send(work_event) { if let Err(e) = network.processor_channel().try_send(work_event) {
crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch", crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch",
"error" => %e, "batch" => self.processing_target); "error" => %e, "batch" => self.processing_target);

View File

@ -1,12 +1,9 @@
use std::{collections::VecDeque, sync::Arc}; use std::{collections::VecDeque, sync::Arc};
use types::{ use types::{signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock};
signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock,
SignedBeaconBlockAndBlobsSidecar,
};
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct BlockBlobRequestInfo<T: EthSpec> { pub struct BlocksAndBlobsRequestInfo<T: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar. /// Blocks we have received awaiting for their corresponding sidecar.
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>, accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>,
/// Sidecars we have received awaiting for their corresponding block. /// Sidecars we have received awaiting for their corresponding block.
@ -17,7 +14,7 @@ pub struct BlockBlobRequestInfo<T: EthSpec> {
is_sidecars_stream_terminated: bool, is_sidecars_stream_terminated: bool,
} }
impl<T: EthSpec> BlockBlobRequestInfo<T> { impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) { pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) {
match maybe_block { match maybe_block {
Some(block) => self.accumulated_blocks.push_back(block), Some(block) => self.accumulated_blocks.push_back(block),
@ -33,7 +30,7 @@ impl<T: EthSpec> BlockBlobRequestInfo<T> {
} }
pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> { pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> {
let BlockBlobRequestInfo { let BlocksAndBlobsRequestInfo {
accumulated_blocks, accumulated_blocks,
mut accumulated_sidecars, mut accumulated_sidecars,
.. ..
@ -51,14 +48,9 @@ impl<T: EthSpec> BlockBlobRequestInfo<T> {
{ {
let blobs_sidecar = let blobs_sidecar =
accumulated_sidecars.pop_front().ok_or("missing sidecar")?; accumulated_sidecars.pop_front().ok_or("missing sidecar")?;
Ok(BlockWrapper::BlockAndBlob( Ok(BlockWrapper::new_with_blobs(beacon_block, blobs_sidecar))
SignedBeaconBlockAndBlobsSidecar {
beacon_block,
blobs_sidecar,
},
))
} else { } else {
Ok(BlockWrapper::Block(beacon_block)) Ok(beacon_block.into())
} }
}) })
.collect::<Result<Vec<_>, _>>(); .collect::<Result<Vec<_>, _>>();

View File

@ -35,13 +35,13 @@
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::BlockLookups; use super::block_lookups::BlockLookups;
use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::network_context::{BlockOrBlobs, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::NetworkMessage; use crate::service::NetworkMessage;
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::range_sync::ExpectedBatchTy; use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt; use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
@ -79,13 +79,13 @@ pub enum RequestId {
/// Request searching for a block's parent. The id is the chain /// Request searching for a block's parent. The id is the chain
ParentLookup { id: Id }, ParentLookup { id: Id },
/// Request was from the backfill sync algorithm. /// Request was from the backfill sync algorithm.
BackFillSync { id: Id }, BackFillBlocks { id: Id },
/// Backfill request for blocks and sidecars. /// Backfill request for blob sidecars.
BackFillSidecarPair { id: Id }, BackFillBlobs { id: Id },
/// The request was from a chain in the range sync algorithm. /// The request was from a chain in the range sync algorithm.
RangeSync { id: Id }, RangeBlocks { id: Id },
/// The request was from a chain in range, asking for ranges of blocks and sidecars. /// The request was from a chain in range, asking for ranges blob sidecars.
RangeSidecarPair { id: Id }, RangeBlobs { id: Id },
} }
#[derive(Debug)] #[derive(Debug)]
@ -103,7 +103,7 @@ pub enum SyncMessage<T: EthSpec> {
}, },
/// A blob has been received from the RPC. /// A blob has been received from the RPC.
RpcGlob { RpcBlobs {
request_id: RequestId, request_id: RequestId,
peer_id: PeerId, peer_id: PeerId,
blob_sidecar: Option<Arc<BlobsSidecar<T>>>, blob_sidecar: Option<Arc<BlobsSidecar<T>>>,
@ -111,10 +111,10 @@ pub enum SyncMessage<T: EthSpec> {
}, },
/// A block and blobs have been received from the RPC. /// A block and blobs have been received from the RPC.
RpcBlockAndGlob { RpcBlockAndBlobs {
request_id: RequestId, request_id: RequestId,
peer_id: PeerId, peer_id: PeerId,
block_and_blobs: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T>>>, block_and_blobs: Option<SignedBeaconBlockAndBlobsSidecar<T>>,
seen_timestamp: Duration, seen_timestamp: Duration,
}, },
@ -295,10 +295,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.block_lookups self.block_lookups
.parent_lookup_failed(id, peer_id, &mut self.network, error); .parent_lookup_failed(id, peer_id, &mut self.network, error);
} }
RequestId::BackFillSync { id } => { RequestId::BackFillBlocks { id } => {
if let Some(batch_id) = self if let Some(batch_id) = self
.network .network
.backfill_request_failed(id, ExpectedBatchTy::OnlyBlock) .backfill_request_failed(id, ByRangeRequestType::Blocks)
{ {
match self match self
.backfill_sync .backfill_sync
@ -310,10 +310,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
} }
RequestId::BackFillSidecarPair { id } => { RequestId::BackFillBlobs { id } => {
if let Some(batch_id) = self if let Some(batch_id) = self
.network .network
.backfill_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs) .backfill_request_failed(id, ByRangeRequestType::BlocksAndBlobs)
{ {
match self match self
.backfill_sync .backfill_sync
@ -324,10 +324,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
} }
} }
RequestId::RangeSync { id } => { RequestId::RangeBlocks { id } => {
if let Some((chain_id, batch_id)) = self if let Some((chain_id, batch_id)) = self
.network .network
.range_sync_request_failed(id, ExpectedBatchTy::OnlyBlock) .range_sync_request_failed(id, ByRangeRequestType::Blocks)
{ {
self.range_sync.inject_error( self.range_sync.inject_error(
&mut self.network, &mut self.network,
@ -339,10 +339,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.update_sync_state() self.update_sync_state()
} }
} }
RequestId::RangeSidecarPair { id } => { RequestId::RangeBlobs { id } => {
if let Some((chain_id, batch_id)) = self if let Some((chain_id, batch_id)) = self
.network .network
.range_sync_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs) .range_sync_request_failed(id, ByRangeRequestType::BlocksAndBlobs)
{ {
self.range_sync.inject_error( self.range_sync.inject_error(
&mut self.network, &mut self.network,
@ -648,18 +648,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.block_lookups .block_lookups
.parent_chain_processed(chain_hash, result, &mut self.network), .parent_chain_processed(chain_hash, result, &mut self.network),
}, },
SyncMessage::RpcGlob { SyncMessage::RpcBlobs {
request_id, request_id,
peer_id, peer_id,
blob_sidecar, blob_sidecar,
seen_timestamp, seen_timestamp,
} => self.rpc_sidecar_received(request_id, peer_id, blob_sidecar, seen_timestamp), } => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp),
SyncMessage::RpcBlockAndGlob { SyncMessage::RpcBlockAndBlobs {
request_id, request_id,
peer_id, peer_id,
block_and_blobs, block_and_blobs,
seen_timestamp, seen_timestamp,
} => self.rpc_block_sidecar_pair_received( } => self.rpc_block_block_and_blobs_received(
request_id, request_id,
peer_id, peer_id,
block_and_blobs, block_and_blobs,
@ -734,18 +734,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response(
id, id,
peer_id, peer_id,
beacon_block.map(|block| BlockWrapper::Block(block)), beacon_block.map(|block| block.into()),
seen_timestamp, seen_timestamp,
&mut self.network, &mut self.network,
), ),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response(
id, id,
peer_id, peer_id,
beacon_block.map(|block| BlockWrapper::Block(block)), beacon_block.map(|block| block.into()),
seen_timestamp, seen_timestamp,
&mut self.network, &mut self.network,
), ),
RequestId::BackFillSync { id } => { RequestId::BackFillBlocks { id } => {
let is_stream_terminator = beacon_block.is_none(); let is_stream_terminator = beacon_block.is_none();
if let Some(batch_id) = self if let Some(batch_id) = self
.network .network
@ -756,7 +756,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
batch_id, batch_id,
&peer_id, &peer_id,
id, id,
beacon_block.map(|block| BlockWrapper::Block(block)), beacon_block.map(|block| block.into()),
) { ) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {} Ok(ProcessResult::Successful) => {}
@ -768,7 +768,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
} }
} }
RequestId::RangeSync { id } => { RequestId::RangeBlocks { id } => {
let is_stream_terminator = beacon_block.is_none(); let is_stream_terminator = beacon_block.is_none();
if let Some((chain_id, batch_id)) = self if let Some((chain_id, batch_id)) = self
.network .network
@ -780,28 +780,28 @@ impl<T: BeaconChainTypes> SyncManager<T> {
chain_id, chain_id,
batch_id, batch_id,
id, id,
beacon_block.map(|block| BlockWrapper::Block(block)), beacon_block.map(|block| block.into()),
); );
self.update_sync_state(); self.update_sync_state();
} }
} }
RequestId::BackFillSidecarPair { id } => { RequestId::BackFillBlobs { id } => {
self.block_blob_backfill_response(id, peer_id, beacon_block.into()) self.blobs_backfill_response(id, peer_id, beacon_block.into())
} }
RequestId::RangeSidecarPair { id } => { RequestId::RangeBlobs { id } => {
self.block_blob_range_response(id, peer_id, beacon_block.into()) self.blobs_range_response(id, peer_id, beacon_block.into())
} }
} }
} }
/// Handles receiving a response for a range sync request that should have both blocks and /// Handles receiving a response for a range sync request that should have both blocks and
/// blobs. /// blobs.
fn block_blob_range_response( fn blobs_range_response(
&mut self, &mut self,
id: Id, id: Id,
peer_id: PeerId, peer_id: PeerId,
block_or_blob: BlockOrBlob<T::EthSpec>, block_or_blob: BlockOrBlobs<T::EthSpec>,
) { ) {
if let Some((chain_id, batch_id, block_responses)) = self if let Some((chain_id, batch_id, block_responses)) = self
.network .network
@ -834,7 +834,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
); );
// TODO: penalize the peer for being a bad boy // TODO: penalize the peer for being a bad boy
let id = RequestId::RangeSidecarPair { id }; let id = RequestId::RangeBlobs { id };
self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
} }
} }
@ -843,11 +843,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// Handles receiving a response for a Backfill sync request that should have both blocks and /// Handles receiving a response for a Backfill sync request that should have both blocks and
/// blobs. /// blobs.
fn block_blob_backfill_response( fn blobs_backfill_response(
&mut self, &mut self,
id: Id, id: Id,
peer_id: PeerId, peer_id: PeerId,
block_or_blob: BlockOrBlob<T::EthSpec>, block_or_blob: BlockOrBlobs<T::EthSpec>,
) { ) {
if let Some((batch_id, block_responses)) = self if let Some((batch_id, block_responses)) = self
.network .network
@ -886,14 +886,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e "peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
); );
// TODO: penalize the peer for being a bad boy // TODO: penalize the peer for being a bad boy
let id = RequestId::BackFillSidecarPair { id }; let id = RequestId::BackFillBlobs { id };
self.inject_error(peer_id, id, RPCError::InvalidData(e.into())) self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
} }
} }
} }
} }
fn rpc_sidecar_received( fn rpc_blobs_received(
&mut self, &mut self,
request_id: RequestId, request_id: RequestId,
peer_id: PeerId, peer_id: PeerId,
@ -904,57 +904,47 @@ impl<T: BeaconChainTypes> SyncManager<T> {
RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => { RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => {
unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block") unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block")
} }
RequestId::BackFillSync { .. } => { RequestId::BackFillBlocks { .. } => {
unreachable!("An only blocks request does not receive sidecars") unreachable!("An only blocks request does not receive sidecars")
} }
RequestId::BackFillSidecarPair { id } => { RequestId::BackFillBlobs { id } => {
self.block_blob_backfill_response(id, peer_id, maybe_sidecar.into()) self.blobs_backfill_response(id, peer_id, maybe_sidecar.into())
} }
RequestId::RangeSync { .. } => { RequestId::RangeBlocks { .. } => {
unreachable!("Only-blocks range requests don't receive sidecars") unreachable!("Only-blocks range requests don't receive sidecars")
} }
RequestId::RangeSidecarPair { id } => { RequestId::RangeBlobs { id } => {
self.block_blob_range_response(id, peer_id, maybe_sidecar.into()) self.blobs_range_response(id, peer_id, maybe_sidecar.into())
} }
} }
} }
fn rpc_block_sidecar_pair_received( fn rpc_block_block_and_blobs_received(
&mut self, &mut self,
request_id: RequestId, request_id: RequestId,
peer_id: PeerId, peer_id: PeerId,
block_sidecar_pair: Option<Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>>, block_sidecar_pair: Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
match request_id { match request_id {
RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response(
id, id,
peer_id, peer_id,
block_sidecar_pair.map(|block_sidecar_pair| { block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()),
BlockWrapper::BlockAndBlob(
// TODO: why is this in an arc
(*block_sidecar_pair).clone(),
)
}),
seen_timestamp, seen_timestamp,
&mut self.network, &mut self.network,
), ),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response(
id, id,
peer_id, peer_id,
block_sidecar_pair.map(|block_sidecar_pair| { block_sidecar_pair.map(|block_sidecar_pair| block_sidecar_pair.into()),
BlockWrapper::BlockAndBlob(
// TODO: why is this in an arc
(*block_sidecar_pair).clone(),
)
}),
seen_timestamp, seen_timestamp,
&mut self.network, &mut self.network,
), ),
RequestId::BackFillSync { .. } RequestId::BackFillBlocks { .. }
| RequestId::BackFillSidecarPair { .. } | RequestId::BackFillBlobs { .. }
| RequestId::RangeSync { .. } | RequestId::RangeBlocks { .. }
| RequestId::RangeSidecarPair { .. } => unreachable!( | RequestId::RangeBlobs { .. } => unreachable!(
"since range requests are not block-glob coupled, this should never be reachable" "since range requests are not block-glob coupled, this should never be reachable"
), ),
} }

View File

@ -1,9 +1,9 @@
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! Provides network functionality for the Syncing thread. This fundamentally wraps a network
//! channel and stores a global RPC ID to perform requests. //! channel and stores a global RPC ID to perform requests.
use super::block_sidecar_coupling::BlockBlobRequestInfo; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::manager::{Id, RequestId as SyncRequestId}; use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ChainId, ExpectedBatchTy}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::beacon_processor::WorkEvent; use crate::beacon_processor::WorkEvent;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
@ -38,11 +38,12 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
backfill_requests: FnvHashMap<Id, BatchId>, backfill_requests: FnvHashMap<Id, BatchId>,
/// BlocksByRange requests paired with BlobsByRange requests made by the range. /// BlocksByRange requests paired with BlobsByRange requests made by the range.
range_sidecar_pair_requests: range_blocks_and_blobs_requests:
FnvHashMap<Id, (ChainId, BatchId, BlockBlobRequestInfo<T::EthSpec>)>, FnvHashMap<Id, (ChainId, BatchId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
/// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync. /// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync.
backfill_sidecar_pair_requests: FnvHashMap<Id, (BatchId, BlockBlobRequestInfo<T::EthSpec>)>, backfill_blocks_and_blobs_requests:
FnvHashMap<Id, (BatchId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
/// Whether the ee is online. If it's not, we don't allow access to the /// Whether the ee is online. If it's not, we don't allow access to the
/// `beacon_processor_send`. /// `beacon_processor_send`.
@ -58,20 +59,20 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
} }
/// Small enumeration to make dealing with block and blob requests easier. /// Small enumeration to make dealing with block and blob requests easier.
pub enum BlockOrBlob<T: EthSpec> { pub enum BlockOrBlobs<T: EthSpec> {
Block(Option<Arc<SignedBeaconBlock<T>>>), Block(Option<Arc<SignedBeaconBlock<T>>>),
Blob(Option<Arc<BlobsSidecar<T>>>), Blobs(Option<Arc<BlobsSidecar<T>>>),
} }
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlob<T> { impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlobs<T> {
fn from(block: Option<Arc<SignedBeaconBlock<T>>>) -> Self { fn from(block: Option<Arc<SignedBeaconBlock<T>>>) -> Self {
BlockOrBlob::Block(block) BlockOrBlobs::Block(block)
} }
} }
impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlob<T> { impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlobs<T> {
fn from(blob: Option<Arc<BlobsSidecar<T>>>) -> Self { fn from(blob: Option<Arc<BlobsSidecar<T>>>) -> Self {
BlockOrBlob::Blob(blob) BlockOrBlobs::Blobs(blob)
} }
} }
@ -89,8 +90,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: 1, request_id: 1,
range_requests: Default::default(), range_requests: Default::default(),
backfill_requests: Default::default(), backfill_requests: Default::default(),
range_sidecar_pair_requests: Default::default(), range_blocks_and_blobs_requests: Default::default(),
backfill_sidecar_pair_requests: Default::default(), backfill_blocks_and_blobs_requests: Default::default(),
execution_engine_state: EngineState::Online, // always assume `Online` at the start execution_engine_state: EngineState::Online, // always assume `Online` at the start
beacon_processor_send, beacon_processor_send,
chain, chain,
@ -140,13 +141,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn blocks_by_range_request( pub fn blocks_by_range_request(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
batch_type: ExpectedBatchTy, batch_type: ByRangeRequestType,
request: BlocksByRangeRequest, request: BlocksByRangeRequest,
chain_id: ChainId, chain_id: ChainId,
batch_id: BatchId, batch_id: BatchId,
) -> Result<Id, &'static str> { ) -> Result<Id, &'static str> {
match batch_type { match batch_type {
ExpectedBatchTy::OnlyBlock => { ByRangeRequestType::Blocks => {
trace!( trace!(
self.log, self.log,
"Sending BlocksByRange request"; "Sending BlocksByRange request";
@ -156,7 +157,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
); );
let request = Request::BlocksByRange(request); let request = Request::BlocksByRange(request);
let id = self.next_id(); let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeSync { id }); let request_id = RequestId::Sync(SyncRequestId::RangeBlocks { id });
self.send_network_msg(NetworkMessage::SendRequest { self.send_network_msg(NetworkMessage::SendRequest {
peer_id, peer_id,
request, request,
@ -165,7 +166,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.range_requests.insert(id, (chain_id, batch_id)); self.range_requests.insert(id, (chain_id, batch_id));
Ok(id) Ok(id)
} }
ExpectedBatchTy::OnlyBlockBlobs => { ByRangeRequestType::BlocksAndBlobs => {
debug!( debug!(
self.log, self.log,
"Sending BlocksByRange and BlobsByRange requests"; "Sending BlocksByRange and BlobsByRange requests";
@ -176,7 +177,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// create the shared request id. This is fine since the rpc handles substream ids. // create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id(); let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeSidecarPair { id }); let request_id = RequestId::Sync(SyncRequestId::RangeBlobs { id });
// Create the blob request based on the blob request. // Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
@ -196,8 +197,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: blobs_request, request: blobs_request,
request_id, request_id,
})?; })?;
let block_blob_info = BlockBlobRequestInfo::default(); let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.range_sidecar_pair_requests self.range_blocks_and_blobs_requests
.insert(id, (chain_id, batch_id, block_blob_info)); .insert(id, (chain_id, batch_id, block_blob_info));
Ok(id) Ok(id)
} }
@ -208,12 +209,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn backfill_blocks_by_range_request( pub fn backfill_blocks_by_range_request(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
batch_type: ExpectedBatchTy, batch_type: ByRangeRequestType,
request: BlocksByRangeRequest, request: BlocksByRangeRequest,
batch_id: BatchId, batch_id: BatchId,
) -> Result<Id, &'static str> { ) -> Result<Id, &'static str> {
match batch_type { match batch_type {
ExpectedBatchTy::OnlyBlock => { ByRangeRequestType::Blocks => {
trace!( trace!(
self.log, self.log,
"Sending backfill BlocksByRange request"; "Sending backfill BlocksByRange request";
@ -223,7 +224,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
); );
let request = Request::BlocksByRange(request); let request = Request::BlocksByRange(request);
let id = self.next_id(); let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id }); let request_id = RequestId::Sync(SyncRequestId::BackFillBlocks { id });
self.send_network_msg(NetworkMessage::SendRequest { self.send_network_msg(NetworkMessage::SendRequest {
peer_id, peer_id,
request, request,
@ -232,7 +233,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.backfill_requests.insert(id, batch_id); self.backfill_requests.insert(id, batch_id);
Ok(id) Ok(id)
} }
ExpectedBatchTy::OnlyBlockBlobs => { ByRangeRequestType::BlocksAndBlobs => {
debug!( debug!(
self.log, self.log,
"Sending backfill BlocksByRange and BlobsByRange requests"; "Sending backfill BlocksByRange and BlobsByRange requests";
@ -243,7 +244,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// create the shared request id. This is fine since the rpc handles substream ids. // create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id(); let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillSidecarPair { id }); let request_id = RequestId::Sync(SyncRequestId::BackFillBlobs { id });
// Create the blob request based on the blob request. // Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
@ -263,8 +264,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: blobs_request, request: blobs_request,
request_id, request_id,
})?; })?;
let block_blob_info = BlockBlobRequestInfo::default(); let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.backfill_sidecar_pair_requests self.backfill_blocks_and_blobs_requests
.insert(id, (batch_id, block_blob_info)); .insert(id, (batch_id, block_blob_info));
Ok(id) Ok(id)
} }
@ -288,18 +289,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn range_sync_block_and_blob_response( pub fn range_sync_block_and_blob_response(
&mut self, &mut self,
request_id: Id, request_id: Id,
block_or_blob: BlockOrBlob<T::EthSpec>, block_or_blob: BlockOrBlobs<T::EthSpec>,
) -> Option<( ) -> Option<(
ChainId, ChainId,
BatchId, BatchId,
Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>, Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>,
)> { )> {
match self.range_sidecar_pair_requests.entry(request_id) { match self.range_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
let (_, _, info) = entry.get_mut(); let (_, _, info) = entry.get_mut();
match block_or_blob { match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
} }
if info.is_finished() { if info.is_finished() {
// If the request is finished, dequeue everything // If the request is finished, dequeue everything
@ -316,28 +317,28 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn range_sync_request_failed( pub fn range_sync_request_failed(
&mut self, &mut self,
request_id: Id, request_id: Id,
batch_type: ExpectedBatchTy, batch_type: ByRangeRequestType,
) -> Option<(ChainId, BatchId)> { ) -> Option<(ChainId, BatchId)> {
match batch_type { match batch_type {
ExpectedBatchTy::OnlyBlockBlobs => self ByRangeRequestType::BlocksAndBlobs => self
.range_sidecar_pair_requests .range_blocks_and_blobs_requests
.remove(&request_id) .remove(&request_id)
.map(|(chain_id, batch_id, _info)| (chain_id, batch_id)), .map(|(chain_id, batch_id, _info)| (chain_id, batch_id)),
ExpectedBatchTy::OnlyBlock => self.range_requests.remove(&request_id), ByRangeRequestType::Blocks => self.range_requests.remove(&request_id),
} }
} }
pub fn backfill_request_failed( pub fn backfill_request_failed(
&mut self, &mut self,
request_id: Id, request_id: Id,
batch_type: ExpectedBatchTy, batch_type: ByRangeRequestType,
) -> Option<BatchId> { ) -> Option<BatchId> {
match batch_type { match batch_type {
ExpectedBatchTy::OnlyBlockBlobs => self ByRangeRequestType::BlocksAndBlobs => self
.backfill_sidecar_pair_requests .backfill_blocks_and_blobs_requests
.remove(&request_id) .remove(&request_id)
.map(|(batch_id, _info)| batch_id), .map(|(batch_id, _info)| batch_id),
ExpectedBatchTy::OnlyBlock => self.backfill_requests.remove(&request_id), ByRangeRequestType::Blocks => self.backfill_requests.remove(&request_id),
} }
} }
@ -360,14 +361,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn backfill_sync_block_and_blob_response( pub fn backfill_sync_block_and_blob_response(
&mut self, &mut self,
request_id: Id, request_id: Id,
block_or_blob: BlockOrBlob<T::EthSpec>, block_or_blob: BlockOrBlobs<T::EthSpec>,
) -> Option<(BatchId, Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>)> { ) -> Option<(BatchId, Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>)> {
match self.backfill_sidecar_pair_requests.entry(request_id) { match self.backfill_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
let (_, info) = entry.get_mut(); let (_, info) = entry.get_mut();
match block_or_blob { match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
} }
if info.is_finished() { if info.is_finished() {
// If the request is finished, dequeue everything // If the request is finished, dequeue everything
@ -533,7 +534,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
/// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// Check whether a batch for this epoch (and only this epoch) should request just blocks or
/// blocks and blobs. /// blocks and blobs.
pub fn batch_type(&self, epoch: types::Epoch) -> ExpectedBatchTy { pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType {
const _: () = assert!( const _: () = assert!(
super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1 super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1
&& super::range_sync::EPOCHS_PER_BATCH == 1, && super::range_sync::EPOCHS_PER_BATCH == 1,
@ -542,18 +543,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
#[cfg(test)] #[cfg(test)]
{ {
// Keep tests only for blocks. // Keep tests only for blocks.
return ExpectedBatchTy::OnlyBlock; return ByRangeRequestType::Blocks;
} }
#[cfg(not(test))] #[cfg(not(test))]
{ {
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
if epoch >= data_availability_boundary { if epoch >= data_availability_boundary {
ExpectedBatchTy::OnlyBlockBlobs ByRangeRequestType::BlocksAndBlobs
} else { } else {
ExpectedBatchTy::OnlyBlock ByRangeRequestType::Blocks
} }
} else { } else {
ExpectedBatchTy::OnlyBlock ByRangeRequestType::Blocks
} }
} }
} }

View File

@ -4,10 +4,9 @@ use lighthouse_network::PeerId;
use std::collections::HashSet; use std::collections::HashSet;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::ops::Sub; use std::ops::Sub;
use std::sync::Arc;
use strum::Display; use strum::Display;
use types::signed_block_and_blobs::BlockWrapper; use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot}; use types::{Epoch, EthSpec, Slot};
/// The number of times to retry a batch before it is considered failed. /// The number of times to retry a batch before it is considered failed.
const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
@ -16,36 +15,12 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty.
const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3;
pub enum BatchTy<T: EthSpec> {
Blocks(Vec<Arc<SignedBeaconBlock<T>>>),
BlocksAndBlobs(Vec<SignedBeaconBlockAndBlobsSidecar<T>>),
}
impl<T: EthSpec> BatchTy<T> {
pub fn into_wrapped_blocks(self) -> Vec<BlockWrapper<T>> {
match self {
BatchTy::Blocks(blocks) => blocks
.into_iter()
.map(|block| BlockWrapper::Block(block))
.collect(),
BatchTy::BlocksAndBlobs(block_sidecar_pair) => block_sidecar_pair
.into_iter()
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob(block_sidecar_pair))
.collect(),
}
}
}
/// Error representing a batch with mixed block types.
#[derive(Debug)]
pub struct MixedBlockTyErr;
/// Type of expected batch. /// Type of expected batch.
#[derive(Debug, Copy, Clone, Display)] #[derive(Debug, Copy, Clone, Display)]
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
pub enum ExpectedBatchTy { pub enum ByRangeRequestType {
OnlyBlockBlobs, BlocksAndBlobs,
OnlyBlock, Blocks,
} }
/// Allows customisation of the above constants used in other sync methods such as BackFillSync. /// Allows customisation of the above constants used in other sync methods such as BackFillSync.
@ -131,7 +106,7 @@ pub struct BatchInfo<T: EthSpec, B: BatchConfig = RangeSyncBatchConfig> {
/// State of the batch. /// State of the batch.
state: BatchState<T>, state: BatchState<T>,
/// Whether this batch contains all blocks or all blocks and blobs. /// Whether this batch contains all blocks or all blocks and blobs.
batch_type: ExpectedBatchTy, batch_type: ByRangeRequestType,
/// Pin the generic /// Pin the generic
marker: std::marker::PhantomData<B>, marker: std::marker::PhantomData<B>,
} }
@ -180,7 +155,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
/// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to /// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to
/// deal with this for now. /// deal with this for now.
/// This means finalization might be slower in eip4844 /// This means finalization might be slower in eip4844
pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ExpectedBatchTy) -> Self { pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ByRangeRequestType) -> Self {
let start_slot = start_epoch.start_slot(T::slots_per_epoch()); let start_slot = start_epoch.start_slot(T::slots_per_epoch());
let end_slot = start_slot + num_of_epochs * T::slots_per_epoch(); let end_slot = start_slot + num_of_epochs * T::slots_per_epoch();
BatchInfo { BatchInfo {
@ -243,7 +218,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
} }
/// Returns a BlocksByRange request associated with the batch. /// Returns a BlocksByRange request associated with the batch.
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ExpectedBatchTy) { pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
( (
BlocksByRangeRequest { BlocksByRangeRequest {
start_slot: self.start_slot.into(), start_slot: self.start_slot.into(),
@ -408,30 +383,11 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
} }
} }
pub fn start_processing(&mut self) -> Result<BatchTy<T>, WrongState> { pub fn start_processing(&mut self) -> Result<Vec<BlockWrapper<T>>, WrongState> {
match self.state.poison() { match self.state.poison() {
BatchState::AwaitingProcessing(peer, blocks) => { BatchState::AwaitingProcessing(peer, blocks) => {
self.state = BatchState::Processing(Attempt::new::<B, T>(peer, &blocks)); self.state = BatchState::Processing(Attempt::new::<B, T>(peer, &blocks));
match self.batch_type { Ok(blocks)
ExpectedBatchTy::OnlyBlockBlobs => {
let blocks = blocks.into_iter().map(|block| {
let BlockWrapper::BlockAndBlob(block_and_blob) = block else {
panic!("Batches should never have a mixed type. This is a bug. Contact D")
};
block_and_blob
}).collect();
Ok(BatchTy::BlocksAndBlobs(blocks))
}
ExpectedBatchTy::OnlyBlock => {
let blocks = blocks.into_iter().map(|block| {
let BlockWrapper::Block(block) = block else {
panic!("Batches should never have a mixed type. This is a bug. Contact D")
};
block
}).collect();
Ok(BatchTy::Blocks(blocks))
}
}
} }
BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Poisoned => unreachable!("Poisoned batch"),
other => { other => {

View File

@ -332,7 +332,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized); let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized);
self.current_processing_batch = Some(batch_id); self.current_processing_batch = Some(batch_id);
let work_event = BeaconWorkEvent::chain_segment(process_id, blocks.into_wrapped_blocks()); let work_event = BeaconWorkEvent::chain_segment(process_id, blocks);
if let Err(e) = beacon_processor_send.try_send(work_event) { if let Err(e) = beacon_processor_send.try_send(work_event) {
crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch",

View File

@ -9,8 +9,8 @@ mod range;
mod sync_type; mod sync_type;
pub use batch::{ pub use batch::{
BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchTy, BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
ExpectedBatchTy, ByRangeRequestType,
}; };
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
pub use range::RangeSync; pub use range::RangeSync;

View File

@ -373,7 +373,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::service::RequestId; use crate::service::RequestId;
use crate::sync::range_sync::ExpectedBatchTy; use crate::sync::range_sync::ByRangeRequestType;
use crate::NetworkMessage; use crate::NetworkMessage;
use super::*; use super::*;
@ -686,7 +686,7 @@ mod tests {
let (peer1, local_info, head_info) = rig.head_peer(); let (peer1, local_info, head_info) = rig.head_peer();
range.add_peer(&mut rig.cx, local_info, peer1, head_info); range.add_peer(&mut rig.cx, local_info, peer1, head_info);
let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 { let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id) (rig.cx.range_sync_response(id, true).unwrap(), id)
} }
other => panic!("unexpected request {:?}", other), other => panic!("unexpected request {:?}", other),
@ -705,7 +705,7 @@ mod tests {
let (peer2, local_info, finalized_info) = rig.finalized_peer(); let (peer2, local_info, finalized_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 { let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id) (rig.cx.range_sync_response(id, true).unwrap(), id)
} }
other => panic!("unexpected request {:?}", other), other => panic!("unexpected request {:?}", other),

View File

@ -34,33 +34,56 @@ impl<T: EthSpec> SignedBeaconBlockAndBlobsSidecar<T> {
} }
} }
/// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. This newtype
/// wraps the `BlockWrapperInner` to ensure blobs cannot be accessed via an enum match. This would
/// circumvent empty blob reconstruction when accessing blobs.
#[derive(Clone, Debug, Derivative)]
#[derivative(PartialEq, Hash(bound = "T: EthSpec"))]
pub struct BlockWrapper<T: EthSpec>(BlockWrapperInner<T>);
/// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`]. /// A wrapper over a [`SignedBeaconBlock`] or a [`SignedBeaconBlockAndBlobsSidecar`].
#[derive(Clone, Debug, Derivative)] #[derive(Clone, Debug, Derivative)]
#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] #[derivative(PartialEq, Hash(bound = "T: EthSpec"))]
pub enum BlockWrapper<T: EthSpec> { pub enum BlockWrapperInner<T: EthSpec> {
Block(Arc<SignedBeaconBlock<T>>), Block(Arc<SignedBeaconBlock<T>>),
BlockAndBlob(SignedBeaconBlockAndBlobsSidecar<T>), BlockAndBlob(SignedBeaconBlockAndBlobsSidecar<T>),
} }
impl<T: EthSpec> BlockWrapper<T> { impl<T: EthSpec> BlockWrapper<T> {
pub fn new(block: Arc<SignedBeaconBlock<T>>) -> Self {
Self(BlockWrapperInner::Block(block))
}
pub fn new_with_blobs(
beacon_block: Arc<SignedBeaconBlock<T>>,
blobs_sidecar: Arc<BlobsSidecar<T>>,
) -> Self {
Self(BlockWrapperInner::BlockAndBlob(
SignedBeaconBlockAndBlobsSidecar {
beacon_block,
blobs_sidecar,
},
))
}
pub fn slot(&self) -> Slot { pub fn slot(&self) -> Slot {
match self { match &self.0 {
BlockWrapper::Block(block) => block.slot(), BlockWrapperInner::Block(block) => block.slot(),
BlockWrapper::BlockAndBlob(block_sidecar_pair) => { BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => {
block_sidecar_pair.beacon_block.slot() block_sidecar_pair.beacon_block.slot()
} }
} }
} }
pub fn block(&self) -> &SignedBeaconBlock<T> { pub fn block(&self) -> &SignedBeaconBlock<T> {
match self { match &self.0 {
BlockWrapper::Block(block) => &block, BlockWrapperInner::Block(block) => &block,
BlockWrapper::BlockAndBlob(block_sidecar_pair) => &block_sidecar_pair.beacon_block, BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => &block_sidecar_pair.beacon_block,
} }
} }
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<T>> { pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<T>> {
match self { match &self.0 {
BlockWrapper::Block(block) => block.clone(), BlockWrapperInner::Block(block) => block.clone(),
BlockWrapper::BlockAndBlob(block_sidecar_pair) => { BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => {
block_sidecar_pair.beacon_block.clone() block_sidecar_pair.beacon_block.clone()
} }
} }
@ -70,20 +93,20 @@ impl<T: EthSpec> BlockWrapper<T> {
&self, &self,
block_root: Option<Hash256>, block_root: Option<Hash256>,
) -> Result<Option<Arc<BlobsSidecar<T>>>, BlobReconstructionError> { ) -> Result<Option<Arc<BlobsSidecar<T>>>, BlobReconstructionError> {
match self { match &self.0 {
BlockWrapper::Block(block) => block BlockWrapperInner::Block(block) => block
.reconstruct_empty_blobs(block_root) .reconstruct_empty_blobs(block_root)
.map(|blob_opt| blob_opt.map(Arc::new)), .map(|blob_opt| blob_opt.map(Arc::new)),
BlockWrapper::BlockAndBlob(block_sidecar_pair) => { BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => {
Ok(Some(block_sidecar_pair.blobs_sidecar.clone())) Ok(Some(block_sidecar_pair.blobs_sidecar.clone()))
} }
} }
} }
pub fn message(&self) -> crate::BeaconBlockRef<T> { pub fn message(&self) -> crate::BeaconBlockRef<T> {
match self { match &self.0 {
BlockWrapper::Block(block) => block.message(), BlockWrapperInner::Block(block) => block.message(),
BlockWrapper::BlockAndBlob(block_sidecar_pair) => { BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => {
block_sidecar_pair.beacon_block.message() block_sidecar_pair.beacon_block.message()
} }
} }
@ -100,14 +123,14 @@ impl<T: EthSpec> BlockWrapper<T> {
Arc<SignedBeaconBlock<T>>, Arc<SignedBeaconBlock<T>>,
Result<Option<Arc<BlobsSidecar<T>>>, BlobReconstructionError>, Result<Option<Arc<BlobsSidecar<T>>>, BlobReconstructionError>,
) { ) {
match self { match self.0 {
BlockWrapper::Block(block) => { BlockWrapperInner::Block(block) => {
let blobs = block let blobs = block
.reconstruct_empty_blobs(block_root) .reconstruct_empty_blobs(block_root)
.map(|blob_opt| blob_opt.map(Arc::new)); .map(|blob_opt| blob_opt.map(Arc::new));
(block, blobs) (block, blobs)
} }
BlockWrapper::BlockAndBlob(block_sidecar_pair) => { BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => {
let SignedBeaconBlockAndBlobsSidecar { let SignedBeaconBlockAndBlobsSidecar {
beacon_block, beacon_block,
blobs_sidecar, blobs_sidecar,
@ -120,12 +143,18 @@ impl<T: EthSpec> BlockWrapper<T> {
impl<T: EthSpec> From<SignedBeaconBlock<T>> for BlockWrapper<T> { impl<T: EthSpec> From<SignedBeaconBlock<T>> for BlockWrapper<T> {
fn from(block: SignedBeaconBlock<T>) -> Self { fn from(block: SignedBeaconBlock<T>) -> Self {
BlockWrapper::Block(Arc::new(block)) BlockWrapper(BlockWrapperInner::Block(Arc::new(block)))
} }
} }
impl<T: EthSpec> From<Arc<SignedBeaconBlock<T>>> for BlockWrapper<T> { impl<T: EthSpec> From<Arc<SignedBeaconBlock<T>>> for BlockWrapper<T> {
fn from(block: Arc<SignedBeaconBlock<T>>) -> Self { fn from(block: Arc<SignedBeaconBlock<T>>) -> Self {
BlockWrapper::Block(block) BlockWrapper(BlockWrapperInner::Block(block))
}
}
impl<T: EthSpec> From<SignedBeaconBlockAndBlobsSidecar<T>> for BlockWrapper<T> {
fn from(block: SignedBeaconBlockAndBlobsSidecar<T>) -> Self {
BlockWrapper(BlockWrapperInner::BlockAndBlob(block))
} }
} }