thread blocks and blobs to sync (#4100)
* thread blocks and blobs to sync * satisfy dead code analysis
This commit is contained in:
parent
1a87222641
commit
3c18e1a3a4
@ -258,7 +258,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if let RequestId::Sync(id) = request_id {
|
if let RequestId::Sync(id) = request_id {
|
||||||
self.send_to_sync(SyncMessage::RpcBlobs {
|
self.send_to_sync(SyncMessage::RpcBlob {
|
||||||
peer_id,
|
peer_id,
|
||||||
request_id: id,
|
request_id: id,
|
||||||
blob_sidecar,
|
blob_sidecar,
|
||||||
@ -330,7 +330,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
|||||||
"Received BlobsByRoot Response";
|
"Received BlobsByRoot Response";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
);
|
);
|
||||||
self.send_to_sync(SyncMessage::RpcBlobs {
|
self.send_to_sync(SyncMessage::RpcBlob {
|
||||||
request_id,
|
request_id,
|
||||||
peer_id,
|
peer_id,
|
||||||
blob_sidecar,
|
blob_sidecar,
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
|
|
||||||
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
|
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
|
||||||
use super::block_lookups::BlockLookups;
|
use super::block_lookups::BlockLookups;
|
||||||
use super::network_context::{BlockOrBlobs, SyncNetworkContext};
|
use super::network_context::{BlockOrBlob, SyncNetworkContext};
|
||||||
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
|
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
|
||||||
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
||||||
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
|
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
|
||||||
@ -86,6 +86,10 @@ pub enum RequestId {
|
|||||||
RangeBlobs { id: Id },
|
RangeBlobs { id: Id },
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(diva) I'm updating functions what at a time, but this should be revisited because I think
|
||||||
|
// some code paths that are split for blobs and blocks can be made just one after sync as a whole
|
||||||
|
// is updated.
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// A message that can be sent to the sync manager thread.
|
/// A message that can be sent to the sync manager thread.
|
||||||
pub enum SyncMessage<T: EthSpec> {
|
pub enum SyncMessage<T: EthSpec> {
|
||||||
@ -101,7 +105,7 @@ pub enum SyncMessage<T: EthSpec> {
|
|||||||
},
|
},
|
||||||
|
|
||||||
/// A blob has been received from the RPC.
|
/// A blob has been received from the RPC.
|
||||||
RpcBlobs {
|
RpcBlob {
|
||||||
request_id: RequestId,
|
request_id: RequestId,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
blob_sidecar: Option<Arc<BlobSidecar<T>>>,
|
blob_sidecar: Option<Arc<BlobSidecar<T>>>,
|
||||||
@ -554,7 +558,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
beacon_block,
|
beacon_block,
|
||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
} => {
|
} => {
|
||||||
self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp);
|
self.rpc_block_or_blob_received(
|
||||||
|
request_id,
|
||||||
|
peer_id,
|
||||||
|
beacon_block.into(),
|
||||||
|
seen_timestamp,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
SyncMessage::UnknownBlock(peer_id, block, block_root) => {
|
SyncMessage::UnknownBlock(peer_id, block, block_root) => {
|
||||||
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
|
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
|
||||||
@ -638,12 +647,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
.block_lookups
|
.block_lookups
|
||||||
.parent_chain_processed(chain_hash, result, &mut self.network),
|
.parent_chain_processed(chain_hash, result, &mut self.network),
|
||||||
},
|
},
|
||||||
SyncMessage::RpcBlobs {
|
SyncMessage::RpcBlob {
|
||||||
request_id,
|
request_id,
|
||||||
peer_id,
|
peer_id,
|
||||||
blob_sidecar,
|
blob_sidecar,
|
||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
} => self.rpc_blobs_received(request_id, peer_id, blob_sidecar, seen_timestamp),
|
} => self.rpc_block_or_blob_received(
|
||||||
|
request_id,
|
||||||
|
peer_id,
|
||||||
|
blob_sidecar.into(),
|
||||||
|
seen_timestamp,
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -702,30 +716,50 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rpc_block_received(
|
fn rpc_block_or_blob_received(
|
||||||
&mut self,
|
&mut self,
|
||||||
request_id: RequestId,
|
request_id: RequestId,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
seen_timestamp: Duration,
|
seen_timestamp: Duration,
|
||||||
) {
|
) {
|
||||||
match request_id {
|
match request_id {
|
||||||
RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response(
|
RequestId::SingleBlock { id } => {
|
||||||
|
// TODO(diva) adjust when dealing with by root requests. This code is here to
|
||||||
|
// satisfy dead code analysis
|
||||||
|
match block_or_blob {
|
||||||
|
BlockOrBlob::Block(maybe_block) => {
|
||||||
|
self.block_lookups.single_block_lookup_response(
|
||||||
id,
|
id,
|
||||||
peer_id,
|
peer_id,
|
||||||
beacon_block.map(|block| block.into()),
|
maybe_block.map(BlockWrapper::Block),
|
||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
&mut self.network,
|
&mut self.network,
|
||||||
),
|
)
|
||||||
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response(
|
}
|
||||||
id,
|
BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."),
|
||||||
peer_id,
|
}
|
||||||
beacon_block.map(|block| block.into()),
|
}
|
||||||
|
RequestId::ParentLookup { id } => {
|
||||||
|
// TODO(diva) adjust when dealing with by root requests. This code is here to
|
||||||
|
// satisfy dead code analysis
|
||||||
|
match block_or_blob {
|
||||||
|
BlockOrBlob::Block(maybe_block) => self.block_lookups.parent_lookup_response(
|
||||||
|
id,
|
||||||
|
peer_id,
|
||||||
|
maybe_block.map(BlockWrapper::Block),
|
||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
&mut self.network,
|
&mut self.network,
|
||||||
),
|
),
|
||||||
|
BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."),
|
||||||
|
}
|
||||||
|
}
|
||||||
RequestId::BackFillBlocks { id } => {
|
RequestId::BackFillBlocks { id } => {
|
||||||
let is_stream_terminator = beacon_block.is_none();
|
let maybe_block = match block_or_blob {
|
||||||
|
BlockOrBlob::Block(maybe_block) => maybe_block,
|
||||||
|
BlockOrBlob::Sidecar(_) => todo!("I think this is unreachable"),
|
||||||
|
};
|
||||||
|
let is_stream_terminator = maybe_block.is_none();
|
||||||
if let Some(batch_id) = self
|
if let Some(batch_id) = self
|
||||||
.network
|
.network
|
||||||
.backfill_sync_only_blocks_response(id, is_stream_terminator)
|
.backfill_sync_only_blocks_response(id, is_stream_terminator)
|
||||||
@ -735,7 +769,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
batch_id,
|
batch_id,
|
||||||
&peer_id,
|
&peer_id,
|
||||||
id,
|
id,
|
||||||
beacon_block.map(|block| block.into()),
|
maybe_block.map(|block| block.into()),
|
||||||
) {
|
) {
|
||||||
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
|
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
|
||||||
Ok(ProcessResult::Successful) => {}
|
Ok(ProcessResult::Successful) => {}
|
||||||
@ -748,7 +782,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
RequestId::RangeBlocks { id } => {
|
RequestId::RangeBlocks { id } => {
|
||||||
let is_stream_terminator = beacon_block.is_none();
|
let maybe_block = match block_or_blob {
|
||||||
|
BlockOrBlob::Block(maybe_block) => maybe_block,
|
||||||
|
BlockOrBlob::Sidecar(_) => todo!("I think this should be unreachable, since this is a range only-blocks request, and the network should not accept this chunk at all. Needs better handling"),
|
||||||
|
};
|
||||||
|
let is_stream_terminator = maybe_block.is_none();
|
||||||
if let Some((chain_id, batch_id)) = self
|
if let Some((chain_id, batch_id)) = self
|
||||||
.network
|
.network
|
||||||
.range_sync_block_response(id, is_stream_terminator)
|
.range_sync_block_response(id, is_stream_terminator)
|
||||||
@ -759,28 +797,28 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
chain_id,
|
chain_id,
|
||||||
batch_id,
|
batch_id,
|
||||||
id,
|
id,
|
||||||
beacon_block.map(|block| block.into()),
|
maybe_block.map(|block| block.into()),
|
||||||
);
|
);
|
||||||
self.update_sync_state();
|
self.update_sync_state();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RequestId::BackFillBlobs { id } => {
|
RequestId::BackFillBlobs { id } => {
|
||||||
self.blobs_backfill_response(id, peer_id, beacon_block.into())
|
self.backfill_block_and_blobs_response(id, peer_id, block_or_blob)
|
||||||
}
|
}
|
||||||
RequestId::RangeBlobs { id } => {
|
RequestId::RangeBlobs { id } => {
|
||||||
self.blobs_range_response(id, peer_id, beacon_block.into())
|
self.range_block_and_blobs_response(id, peer_id, block_or_blob)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles receiving a response for a range sync request that should have both blocks and
|
/// Handles receiving a response for a range sync request that should have both blocks and
|
||||||
/// blobs.
|
/// blobs.
|
||||||
fn blobs_range_response(
|
fn range_block_and_blobs_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
id: Id,
|
id: Id,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
block_or_blob: BlockOrBlobs<T::EthSpec>,
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
) {
|
) {
|
||||||
if let Some((chain_id, resp)) = self
|
if let Some((chain_id, resp)) = self
|
||||||
.network
|
.network
|
||||||
@ -822,11 +860,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
|
|
||||||
/// Handles receiving a response for a Backfill sync request that should have both blocks and
|
/// Handles receiving a response for a Backfill sync request that should have both blocks and
|
||||||
/// blobs.
|
/// blobs.
|
||||||
fn blobs_backfill_response(
|
fn backfill_block_and_blobs_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
id: Id,
|
id: Id,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
block_or_blob: BlockOrBlobs<T::EthSpec>,
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
) {
|
) {
|
||||||
if let Some(resp) = self
|
if let Some(resp) = self
|
||||||
.network
|
.network
|
||||||
@ -871,32 +909,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rpc_blobs_received(
|
|
||||||
&mut self,
|
|
||||||
request_id: RequestId,
|
|
||||||
_peer_id: PeerId,
|
|
||||||
_maybe_blob: Option<Arc<BlobSidecar<<T>::EthSpec>>>,
|
|
||||||
_seen_timestamp: Duration,
|
|
||||||
) {
|
|
||||||
match request_id {
|
|
||||||
RequestId::SingleBlock { .. } | RequestId::ParentLookup { .. } => {
|
|
||||||
unreachable!("There is no such thing as a singular 'by root' glob request that is not accompanied by the block")
|
|
||||||
}
|
|
||||||
RequestId::BackFillBlocks { .. } => {
|
|
||||||
unreachable!("An only blocks request does not receive sidecars")
|
|
||||||
}
|
|
||||||
RequestId::BackFillBlobs { .. } => {
|
|
||||||
unimplemented!("Adjust backfill sync");
|
|
||||||
}
|
|
||||||
RequestId::RangeBlocks { .. } => {
|
|
||||||
unreachable!("Only-blocks range requests don't receive sidecars")
|
|
||||||
}
|
|
||||||
RequestId::RangeBlobs { id: _ } => {
|
|
||||||
unimplemented!("Adjust range");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<IgnoredOkVal, T: EthSpec> From<Result<IgnoredOkVal, BlockError<T>>> for BlockProcessResult<T> {
|
impl<IgnoredOkVal, T: EthSpec> From<Result<IgnoredOkVal, BlockError<T>>> for BlockProcessResult<T> {
|
||||||
|
@ -75,20 +75,20 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Small enumeration to make dealing with block and blob requests easier.
|
/// Small enumeration to make dealing with block and blob requests easier.
|
||||||
pub enum BlockOrBlobs<T: EthSpec> {
|
pub enum BlockOrBlob<T: EthSpec> {
|
||||||
Block(Option<Arc<SignedBeaconBlock<T>>>),
|
Block(Option<Arc<SignedBeaconBlock<T>>>),
|
||||||
Blobs(Option<Arc<BlobSidecar<T>>>),
|
Sidecar(Option<Arc<BlobSidecar<T>>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlobs<T> {
|
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlob<T> {
|
||||||
fn from(block: Option<Arc<SignedBeaconBlock<T>>>) -> Self {
|
fn from(block: Option<Arc<SignedBeaconBlock<T>>>) -> Self {
|
||||||
BlockOrBlobs::Block(block)
|
BlockOrBlob::Block(block)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: EthSpec> From<Option<Arc<BlobSidecar<T>>>> for BlockOrBlobs<T> {
|
impl<T: EthSpec> From<Option<Arc<BlobSidecar<T>>>> for BlockOrBlob<T> {
|
||||||
fn from(blob: Option<Arc<BlobSidecar<T>>>) -> Self {
|
fn from(blob: Option<Arc<BlobSidecar<T>>>) -> Self {
|
||||||
BlockOrBlobs::Blobs(blob)
|
BlockOrBlob::Sidecar(blob)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -311,15 +311,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
pub fn range_sync_block_and_blob_response(
|
pub fn range_sync_block_and_blob_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
request_id: Id,
|
request_id: Id,
|
||||||
block_or_blob: BlockOrBlobs<T::EthSpec>,
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
) -> Option<(ChainId, BlocksAndBlobsByRangeResponse<T::EthSpec>)> {
|
) -> Option<(ChainId, BlocksAndBlobsByRangeResponse<T::EthSpec>)> {
|
||||||
match self.range_blocks_and_blobs_requests.entry(request_id) {
|
match self.range_blocks_and_blobs_requests.entry(request_id) {
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
let req = entry.get_mut();
|
let req = entry.get_mut();
|
||||||
let info = &mut req.block_blob_info;
|
let info = &mut req.block_blob_info;
|
||||||
match block_or_blob {
|
match block_or_blob {
|
||||||
BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
|
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
|
||||||
BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
|
BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
|
||||||
}
|
}
|
||||||
if info.is_finished() {
|
if info.is_finished() {
|
||||||
// If the request is finished, dequeue everything
|
// If the request is finished, dequeue everything
|
||||||
@ -402,14 +402,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
pub fn backfill_sync_block_and_blob_response(
|
pub fn backfill_sync_block_and_blob_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
request_id: Id,
|
request_id: Id,
|
||||||
block_or_blob: BlockOrBlobs<T::EthSpec>,
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
) -> Option<BlocksAndBlobsByRangeResponse<T::EthSpec>> {
|
) -> Option<BlocksAndBlobsByRangeResponse<T::EthSpec>> {
|
||||||
match self.backfill_blocks_and_blobs_requests.entry(request_id) {
|
match self.backfill_blocks_and_blobs_requests.entry(request_id) {
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
let (_, info) = entry.get_mut();
|
let (_, info) = entry.get_mut();
|
||||||
match block_or_blob {
|
match block_or_blob {
|
||||||
BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
|
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
|
||||||
BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
|
BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
|
||||||
}
|
}
|
||||||
if info.is_finished() {
|
if info.is_finished() {
|
||||||
// If the request is finished, dequeue everything
|
// If the request is finished, dequeue everything
|
||||||
|
Loading…
Reference in New Issue
Block a user