handle no blobs from peers instead of empty blobs in range requests
This commit is contained in:
parent
33d01a7911
commit
cd6655dba9
115
beacon_node/network/src/sync/block_sidecar_coupling.rs
Normal file
115
beacon_node/network/src/sync/block_sidecar_coupling.rs
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
use std::{
|
||||||
|
collections::{hash_map::OccupiedEntry, VecDeque},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use types::{
|
||||||
|
signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock,
|
||||||
|
SignedBeaconBlockAndBlobsSidecar,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ReceivedData<T: EthSpec> {
|
||||||
|
block: Option<Arc<SignedBeaconBlock<T>>>,
|
||||||
|
blob: Option<Arc<BlobsSidecar<T>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct BlockBlobRequestInfo<T: EthSpec> {
|
||||||
|
/// Blocks we have received awaiting for their corresponding sidecar.
|
||||||
|
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>,
|
||||||
|
/// Sidecars we have received awaiting for their corresponding block.
|
||||||
|
accumulated_sidecars: VecDeque<Arc<BlobsSidecar<T>>>,
|
||||||
|
/// Whether the individual RPC request for blocks is finished or not.
|
||||||
|
is_blocks_rpc_finished: bool,
|
||||||
|
/// Whether the individual RPC request for sidecars is finished or not.
|
||||||
|
is_sidecar_rpc_finished: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct BlockBlobRequestEntry<'a, K, T: EthSpec> {
|
||||||
|
entry: OccupiedEntry<'a, K, BlockBlobRequestInfo<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, K, T: EthSpec> From<OccupiedEntry<'a, K, BlockBlobRequestInfo<T>>>
|
||||||
|
for BlockBlobRequestEntry<'a, K, T>
|
||||||
|
{
|
||||||
|
fn from(entry: OccupiedEntry<'a, K, BlockBlobRequestInfo<T>>) -> Self {
|
||||||
|
BlockBlobRequestEntry { entry }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: EthSpec> BlockBlobRequestInfo<T> {
|
||||||
|
pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) {
|
||||||
|
match maybe_block {
|
||||||
|
Some(block) => self.accumulated_blocks.push_back(block),
|
||||||
|
None => self.is_blocks_rpc_finished = true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobsSidecar<T>>>) {
|
||||||
|
match maybe_sidecar {
|
||||||
|
Some(sidecar) => self.accumulated_sidecars.push_back(sidecar),
|
||||||
|
None => self.is_sidecar_rpc_finished = true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> {
|
||||||
|
let BlockBlobRequestInfo {
|
||||||
|
accumulated_blocks,
|
||||||
|
accumulated_sidecars,
|
||||||
|
..
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
// Create the storage for our pairs.
|
||||||
|
let mut pairs = Vec::with_capacity(accumulated_blocks.len());
|
||||||
|
|
||||||
|
// ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any block (empty
|
||||||
|
// included) for a skipped slot is not permitted.
|
||||||
|
for sidecar in accumulated_sidecars {
|
||||||
|
let blob_slot = sidecar.beacon_block_slot;
|
||||||
|
// First queue any blocks that might not have a blob.
|
||||||
|
while let Some(block) = {
|
||||||
|
// We identify those if their slot is less than the current blob's slot.
|
||||||
|
match accumulated_blocks.front() {
|
||||||
|
Some(borrowed_block) if borrowed_block.slot() < blob_slot => {
|
||||||
|
accumulated_blocks.pop_front()
|
||||||
|
}
|
||||||
|
Some(_) => None,
|
||||||
|
None => {
|
||||||
|
// We received a blob and ran out of blocks. This is a peer error
|
||||||
|
return Err("Blob without more blobs to pair with returned by peer");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} {
|
||||||
|
pairs.push(BlockWrapper::Block { block })
|
||||||
|
}
|
||||||
|
|
||||||
|
// The next block must be present and must match the blob's slot
|
||||||
|
let next_block = accumulated_blocks
|
||||||
|
.pop_front()
|
||||||
|
.expect("If block stream ended, an error was previously returned");
|
||||||
|
if next_block.slot() != blob_slot {
|
||||||
|
// We verified that the slot of the block is not less than the slot of the blob (it
|
||||||
|
// would have been returned before). It's also not equal, so this block is ahead
|
||||||
|
// than the blob. This means the blob is not paired.
|
||||||
|
return Err("Blob without a matching block returned by peer");
|
||||||
|
}
|
||||||
|
pairs.push(BlockWrapper::BlockAndBlob {
|
||||||
|
block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar {
|
||||||
|
beacon_block: next_block,
|
||||||
|
blobs_sidecar: sidecar,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Every remaining block does not have a blob
|
||||||
|
for block in accumulated_blocks {
|
||||||
|
pairs.push(BlockWrapper::Block { block })
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(pairs)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_finished(&self) -> bool {
|
||||||
|
self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished
|
||||||
|
}
|
||||||
|
}
|
@ -35,7 +35,7 @@
|
|||||||
|
|
||||||
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
|
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
|
||||||
use super::block_lookups::BlockLookups;
|
use super::block_lookups::BlockLookups;
|
||||||
use super::network_context::SyncNetworkContext;
|
use super::network_context::{BlockOrBlob, SyncNetworkContext};
|
||||||
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
|
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
|
||||||
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
||||||
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
|
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
|
||||||
@ -45,11 +45,11 @@ use crate::sync::range_sync::ExpectedBatchTy;
|
|||||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
|
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
|
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
|
||||||
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
|
use lighthouse_network::rpc::RPCError;
|
||||||
use lighthouse_network::types::{NetworkGlobals, SyncState};
|
use lighthouse_network::types::{NetworkGlobals, SyncState};
|
||||||
use lighthouse_network::SyncInfo;
|
use lighthouse_network::SyncInfo;
|
||||||
use lighthouse_network::{PeerAction, PeerId};
|
use lighthouse_network::{PeerAction, PeerId};
|
||||||
use slog::{crit, debug, error, info, trace, Logger};
|
use slog::{crit, debug, error, info, trace, warn, Logger};
|
||||||
use std::boxed::Box;
|
use std::boxed::Box;
|
||||||
use std::ops::Sub;
|
use std::ops::Sub;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -746,17 +746,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
&mut self.network,
|
&mut self.network,
|
||||||
),
|
),
|
||||||
RequestId::BackFillSync { id } => {
|
RequestId::BackFillSync { id } => {
|
||||||
if let Some((batch_id, block)) = self.network.backfill_sync_block_response(
|
let is_stream_terminator = beacon_block.is_none();
|
||||||
id,
|
if let Some(batch_id) = self
|
||||||
beacon_block,
|
.network
|
||||||
ExpectedBatchTy::OnlyBlock,
|
.backfill_sync_only_blocks_response(id, is_stream_terminator)
|
||||||
) {
|
{
|
||||||
match self.backfill_sync.on_block_response(
|
match self.backfill_sync.on_block_response(
|
||||||
&mut self.network,
|
&mut self.network,
|
||||||
batch_id,
|
batch_id,
|
||||||
&peer_id,
|
&peer_id,
|
||||||
id,
|
id,
|
||||||
block,
|
beacon_block.map(|block| BlockWrapper::Block { block }),
|
||||||
) {
|
) {
|
||||||
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
|
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
|
||||||
Ok(ProcessResult::Successful) => {}
|
Ok(ProcessResult::Successful) => {}
|
||||||
@ -769,61 +769,125 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
RequestId::RangeSync { id } => {
|
RequestId::RangeSync { id } => {
|
||||||
if let Some((chain_id, batch_id, block)) = self.network.range_sync_block_response(
|
let is_stream_terminator = beacon_block.is_none();
|
||||||
id,
|
if let Some((chain_id, batch_id)) = self
|
||||||
beacon_block,
|
.network
|
||||||
ExpectedBatchTy::OnlyBlock,
|
.range_sync_block_response(id, is_stream_terminator)
|
||||||
) {
|
{
|
||||||
self.range_sync.blocks_by_range_response(
|
self.range_sync.blocks_by_range_response(
|
||||||
&mut self.network,
|
&mut self.network,
|
||||||
peer_id,
|
peer_id,
|
||||||
chain_id,
|
chain_id,
|
||||||
batch_id,
|
batch_id,
|
||||||
id,
|
id,
|
||||||
block,
|
beacon_block.map(|block| BlockWrapper::Block { block }),
|
||||||
);
|
);
|
||||||
self.update_sync_state();
|
self.update_sync_state();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RequestId::BackFillSidecarPair { id } => {
|
RequestId::BackFillSidecarPair { id } => {
|
||||||
if let Some((batch_id, block)) = self.network.backfill_sync_block_response(
|
self.block_blob_backfill_response(id, peer_id, beacon_block.into())
|
||||||
id,
|
}
|
||||||
beacon_block,
|
RequestId::RangeSidecarPair { id } => {
|
||||||
ExpectedBatchTy::OnlyBlockBlobs,
|
self.block_blob_range_response(id, peer_id, beacon_block.into())
|
||||||
) {
|
}
|
||||||
match self.backfill_sync.on_block_response(
|
}
|
||||||
&mut self.network,
|
}
|
||||||
batch_id,
|
|
||||||
&peer_id,
|
/// Handles receiving a response for a range sync request that should have both blocks and
|
||||||
id,
|
/// blobs.
|
||||||
block,
|
fn block_blob_range_response(
|
||||||
) {
|
&mut self,
|
||||||
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
|
id: Id,
|
||||||
Ok(ProcessResult::Successful) => {}
|
peer_id: PeerId,
|
||||||
Err(_error) => {
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
// The backfill sync has failed, errors are reported
|
) {
|
||||||
// within.
|
if let Some((chain_id, batch_id, block_responses)) = self
|
||||||
self.update_sync_state();
|
.network
|
||||||
|
.range_sync_block_and_blob_response(id, block_or_blob)
|
||||||
|
{
|
||||||
|
match block_responses {
|
||||||
|
Ok(blocks) => {
|
||||||
|
for block in blocks
|
||||||
|
.into_iter()
|
||||||
|
.map(|block| Some(block))
|
||||||
|
// chain the stream terminator
|
||||||
|
.chain(vec![None])
|
||||||
|
{
|
||||||
|
self.range_sync.blocks_by_range_response(
|
||||||
|
&mut self.network,
|
||||||
|
peer_id,
|
||||||
|
chain_id,
|
||||||
|
batch_id,
|
||||||
|
id,
|
||||||
|
block,
|
||||||
|
);
|
||||||
|
self.update_sync_state();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// inform backfill that the request needs to be treated as failed
|
||||||
|
// With time we will want to downgrade this log
|
||||||
|
warn!(
|
||||||
|
self.log, "Blocks and blobs request for backfill received invalid data";
|
||||||
|
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
|
||||||
|
);
|
||||||
|
// TODO: penalize the peer for being a bad boy
|
||||||
|
let id = RequestId::RangeSidecarPair { id };
|
||||||
|
self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles receiving a response for a bacjfill sync request that should have both blocks and
|
||||||
|
/// blobs.
|
||||||
|
fn block_blob_backfill_response(
|
||||||
|
&mut self,
|
||||||
|
id: Id,
|
||||||
|
peer_id: PeerId,
|
||||||
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
if let Some((batch_id, block_responses)) = self
|
||||||
|
.network
|
||||||
|
.backfill_sync_block_and_blob_response(id, block_or_blob)
|
||||||
|
{
|
||||||
|
match block_responses {
|
||||||
|
Ok(blocks) => {
|
||||||
|
for block in blocks
|
||||||
|
.into_iter()
|
||||||
|
.map(|block| Some(block))
|
||||||
|
// chain the stream terminator
|
||||||
|
.chain(vec![None])
|
||||||
|
{
|
||||||
|
match self.backfill_sync.on_block_response(
|
||||||
|
&mut self.network,
|
||||||
|
batch_id,
|
||||||
|
&peer_id,
|
||||||
|
id,
|
||||||
|
block,
|
||||||
|
) {
|
||||||
|
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
|
||||||
|
Ok(ProcessResult::Successful) => {}
|
||||||
|
Err(_error) => {
|
||||||
|
// The backfill sync has failed, errors are reported
|
||||||
|
// within.
|
||||||
|
self.update_sync_state();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Err(e) => {
|
||||||
RequestId::RangeSidecarPair { id } => {
|
// inform backfill that the request needs to be treated as failed
|
||||||
if let Some((chain_id, batch_id, block)) = self.network.range_sync_block_response(
|
// With time we will want to downgrade this log
|
||||||
id,
|
warn!(
|
||||||
beacon_block,
|
self.log, "Blocks and blobs request for backfill received invalid data";
|
||||||
ExpectedBatchTy::OnlyBlockBlobs,
|
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
|
||||||
) {
|
|
||||||
self.range_sync.blocks_by_range_response(
|
|
||||||
&mut self.network,
|
|
||||||
peer_id,
|
|
||||||
chain_id,
|
|
||||||
batch_id,
|
|
||||||
id,
|
|
||||||
block,
|
|
||||||
);
|
);
|
||||||
self.update_sync_state();
|
// TODO: penalize the peer for being a bad boy
|
||||||
|
let id = RequestId::BackFillSidecarPair { id };
|
||||||
|
self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -844,44 +908,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
unreachable!("An only blocks request does not receive sidecars")
|
unreachable!("An only blocks request does not receive sidecars")
|
||||||
}
|
}
|
||||||
RequestId::BackFillSidecarPair { id } => {
|
RequestId::BackFillSidecarPair { id } => {
|
||||||
if let Some((batch_id, block)) = self
|
self.block_blob_backfill_response(id, peer_id, maybe_sidecar.into())
|
||||||
.network
|
|
||||||
.backfill_sync_sidecar_response(id, maybe_sidecar)
|
|
||||||
{
|
|
||||||
match self.backfill_sync.on_block_response(
|
|
||||||
&mut self.network,
|
|
||||||
batch_id,
|
|
||||||
&peer_id,
|
|
||||||
id,
|
|
||||||
block,
|
|
||||||
) {
|
|
||||||
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
|
|
||||||
Ok(ProcessResult::Successful) => {}
|
|
||||||
Err(_error) => {
|
|
||||||
// The backfill sync has failed, errors are reported
|
|
||||||
// within.
|
|
||||||
self.update_sync_state();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
RequestId::RangeSync { .. } => {
|
RequestId::RangeSync { .. } => {
|
||||||
unreachable!("And only blocks range request does not receive sidecars")
|
unreachable!("Only-blocks range requests don't receive sidecars")
|
||||||
}
|
}
|
||||||
RequestId::RangeSidecarPair { id } => {
|
RequestId::RangeSidecarPair { id } => {
|
||||||
if let Some((chain_id, batch_id, block)) =
|
self.block_blob_range_response(id, peer_id, maybe_sidecar.into())
|
||||||
self.network.range_sync_sidecar_response(id, maybe_sidecar)
|
|
||||||
{
|
|
||||||
self.range_sync.blocks_by_range_response(
|
|
||||||
&mut self.network,
|
|
||||||
peer_id,
|
|
||||||
chain_id,
|
|
||||||
batch_id,
|
|
||||||
id,
|
|
||||||
block,
|
|
||||||
);
|
|
||||||
self.update_sync_state();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
//! Stores the various syncing methods for the beacon chain.
|
//! Stores the various syncing methods for the beacon chain.
|
||||||
mod backfill_sync;
|
mod backfill_sync;
|
||||||
mod block_lookups;
|
mod block_lookups;
|
||||||
|
mod block_sidecar_coupling;
|
||||||
pub mod manager;
|
pub mod manager;
|
||||||
mod network_context;
|
mod network_context;
|
||||||
mod peer_sync_info;
|
mod peer_sync_info;
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
|
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
|
||||||
//! channel and stores a global RPC ID to perform requests.
|
//! channel and stores a global RPC ID to perform requests.
|
||||||
|
|
||||||
|
use super::block_sidecar_coupling::BlockBlobRequestInfo;
|
||||||
use super::manager::{Id, RequestId as SyncRequestId};
|
use super::manager::{Id, RequestId as SyncRequestId};
|
||||||
use super::range_sync::{BatchId, ChainId, ExpectedBatchTy};
|
use super::range_sync::{BatchId, ChainId, ExpectedBatchTy};
|
||||||
use crate::beacon_processor::WorkEvent;
|
use crate::beacon_processor::WorkEvent;
|
||||||
@ -13,59 +14,11 @@ use lighthouse_network::rpc::methods::BlobsByRangeRequest;
|
|||||||
use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason};
|
use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason};
|
||||||
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
|
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
|
||||||
use slog::{debug, trace, warn};
|
use slog::{debug, trace, warn};
|
||||||
use slot_clock::SlotClock;
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use types::signed_block_and_blobs::BlockWrapper;
|
use types::signed_block_and_blobs::BlockWrapper;
|
||||||
use types::{
|
use types::{BlobsSidecar, EthSpec, SignedBeaconBlock};
|
||||||
BlobsSidecar, ChainSpec, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
|
||||||
struct BlockBlobRequestInfo<T: EthSpec> {
|
|
||||||
/// Blocks we have received awaiting for their corresponding sidecar.
|
|
||||||
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>,
|
|
||||||
/// Sidecars we have received awaiting for their corresponding block.
|
|
||||||
accumulated_sidecars: VecDeque<Arc<BlobsSidecar<T>>>,
|
|
||||||
/// Whether the individual RPC request for blocks is finished or not.
|
|
||||||
is_blocks_rpc_finished: bool,
|
|
||||||
/// Whether the individual RPC request for sidecars is finished or not.
|
|
||||||
is_sidecar_rpc_finished: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: EthSpec> BlockBlobRequestInfo<T> {
|
|
||||||
pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) {
|
|
||||||
match maybe_block {
|
|
||||||
Some(block) => self.accumulated_blocks.push_back(block),
|
|
||||||
None => self.is_blocks_rpc_finished = true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobsSidecar<T>>>) {
|
|
||||||
match maybe_sidecar {
|
|
||||||
Some(sidecar) => self.accumulated_sidecars.push_back(sidecar),
|
|
||||||
None => self.is_sidecar_rpc_finished = true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pop_response(&mut self) -> Option<SignedBeaconBlockAndBlobsSidecar<T>> {
|
|
||||||
if !self.accumulated_blocks.is_empty() && !self.accumulated_sidecars.is_empty() {
|
|
||||||
let beacon_block = self.accumulated_blocks.pop_front().expect("non empty");
|
|
||||||
let blobs_sidecar = self.accumulated_sidecars.pop_front().expect("non empty");
|
|
||||||
return Some(SignedBeaconBlockAndBlobsSidecar {
|
|
||||||
beacon_block,
|
|
||||||
blobs_sidecar,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_finished(&self) -> bool {
|
|
||||||
self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
|
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
|
||||||
pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
||||||
@ -104,6 +57,24 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
|||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Small enumeration to make dealing with block and blob requests easier.
|
||||||
|
pub enum BlockOrBlob<T: EthSpec> {
|
||||||
|
Block(Option<Arc<SignedBeaconBlock<T>>>),
|
||||||
|
Blob(Option<Arc<BlobsSidecar<T>>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlob<T> {
|
||||||
|
fn from(block: Option<Arc<SignedBeaconBlock<T>>>) -> Self {
|
||||||
|
BlockOrBlob::Block(block)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlob<T> {
|
||||||
|
fn from(blob: Option<Arc<BlobsSidecar<T>>>) -> Self {
|
||||||
|
BlockOrBlob::Blob(blob)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
@ -300,91 +271,45 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Received a blocks by range response.
|
/// Response for a request that is only for blocks.
|
||||||
pub fn range_sync_block_response(
|
pub fn range_sync_block_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
request_id: Id,
|
request_id: Id,
|
||||||
maybe_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
is_stream_terminator: bool,
|
||||||
batch_type: ExpectedBatchTy,
|
) -> Option<(ChainId, BatchId)> {
|
||||||
) -> Option<(ChainId, BatchId, Option<BlockWrapper<T::EthSpec>>)> {
|
if is_stream_terminator {
|
||||||
match batch_type {
|
self.range_requests
|
||||||
ExpectedBatchTy::OnlyBlockBlobs => {
|
.remove(&request_id)
|
||||||
match self.range_sidecar_pair_requests.entry(request_id) {
|
.map(|(chain_id, batch_id)| (chain_id, batch_id))
|
||||||
Entry::Occupied(mut entry) => {
|
} else {
|
||||||
let (chain_id, batch_id, info) = entry.get_mut();
|
self.range_requests.get(&request_id).cloned()
|
||||||
let chain_id = chain_id.clone();
|
|
||||||
let batch_id = batch_id.clone();
|
|
||||||
let stream_terminator = maybe_block.is_none();
|
|
||||||
info.add_block_response(maybe_block);
|
|
||||||
let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| {
|
|
||||||
BlockWrapper::BlockAndBlob { block_sidecar_pair }
|
|
||||||
});
|
|
||||||
|
|
||||||
if stream_terminator && !info.is_finished() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
if !stream_terminator && maybe_block_wrapped.is_none() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.is_finished() {
|
|
||||||
entry.remove();
|
|
||||||
}
|
|
||||||
|
|
||||||
Some((chain_id, batch_id, maybe_block_wrapped))
|
|
||||||
}
|
|
||||||
Entry::Vacant(_) => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ExpectedBatchTy::OnlyBlock => {
|
|
||||||
// if the request is just for blocks then it can be removed on a stream termination
|
|
||||||
match maybe_block {
|
|
||||||
Some(block) => {
|
|
||||||
self.range_requests
|
|
||||||
.get(&request_id)
|
|
||||||
.cloned()
|
|
||||||
.map(|(chain_id, batch_id)| {
|
|
||||||
(chain_id, batch_id, Some(BlockWrapper::Block { block }))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
None => self
|
|
||||||
.range_requests
|
|
||||||
.remove(&request_id)
|
|
||||||
.map(|(chain_id, batch_id)| (chain_id, batch_id, None)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn range_sync_sidecar_response(
|
/// Received a blocks by range response for a request that couples blocks and blobs.
|
||||||
|
pub fn range_sync_block_and_blob_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
request_id: Id,
|
request_id: Id,
|
||||||
maybe_sidecar: Option<Arc<BlobsSidecar<T::EthSpec>>>,
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
) -> Option<(ChainId, BatchId, Option<BlockWrapper<T::EthSpec>>)> {
|
) -> Option<(
|
||||||
|
ChainId,
|
||||||
|
BatchId,
|
||||||
|
Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>,
|
||||||
|
)> {
|
||||||
match self.range_sidecar_pair_requests.entry(request_id) {
|
match self.range_sidecar_pair_requests.entry(request_id) {
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
let (chain_id, batch_id, info) = entry.get_mut();
|
let (_, _, info) = entry.get_mut();
|
||||||
let chain_id = chain_id.clone();
|
match block_or_blob {
|
||||||
let batch_id = batch_id.clone();
|
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
|
||||||
let stream_terminator = maybe_sidecar.is_none();
|
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
|
||||||
info.add_sidecar_response(maybe_sidecar);
|
|
||||||
let maybe_block = info
|
|
||||||
.pop_response()
|
|
||||||
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair });
|
|
||||||
|
|
||||||
if stream_terminator && !info.is_finished() {
|
|
||||||
return None;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !stream_terminator && maybe_block.is_none() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.is_finished() {
|
if info.is_finished() {
|
||||||
entry.remove();
|
// If the request is finished, unqueue everything
|
||||||
|
let (chain_id, batch_id, info) = entry.remove();
|
||||||
|
Some((chain_id, batch_id, info.into_responses()))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
Some((chain_id, batch_id, maybe_block))
|
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
}
|
}
|
||||||
@ -418,65 +343,41 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Received a blocks by range response.
|
/// Response for a request that is only for blocks.
|
||||||
pub fn backfill_sync_block_response(
|
pub fn backfill_sync_only_blocks_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
request_id: Id,
|
request_id: Id,
|
||||||
maybe_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
is_stream_terminator: bool,
|
||||||
batch_type: ExpectedBatchTy,
|
) -> Option<BatchId> {
|
||||||
) -> Option<(BatchId, Option<BlockWrapper<T::EthSpec>>)> {
|
if is_stream_terminator {
|
||||||
match batch_type {
|
self.backfill_requests
|
||||||
ExpectedBatchTy::OnlyBlockBlobs => {
|
.remove(&request_id)
|
||||||
match self.backfill_sidecar_pair_requests.entry(request_id) {
|
.map(|batch_id| batch_id)
|
||||||
Entry::Occupied(mut entry) => {
|
} else {
|
||||||
let (batch_id, info) = entry.get_mut();
|
self.backfill_requests.get(&request_id).cloned()
|
||||||
let batch_id = batch_id.clone();
|
|
||||||
info.add_block_response(maybe_block);
|
|
||||||
let maybe_block = info.pop_response().map(|block_sidecar_pair| {
|
|
||||||
BlockWrapper::BlockAndBlob { block_sidecar_pair }
|
|
||||||
});
|
|
||||||
if info.is_finished() {
|
|
||||||
entry.remove();
|
|
||||||
}
|
|
||||||
Some((batch_id, maybe_block))
|
|
||||||
}
|
|
||||||
Entry::Vacant(_) => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ExpectedBatchTy::OnlyBlock => {
|
|
||||||
// if the request is just for blocks then it can be removed on a stream termination
|
|
||||||
match maybe_block {
|
|
||||||
Some(block) => self
|
|
||||||
.backfill_requests
|
|
||||||
.get(&request_id)
|
|
||||||
.cloned()
|
|
||||||
.map(|batch_id| (batch_id, Some(BlockWrapper::Block { block }))),
|
|
||||||
None => self
|
|
||||||
.backfill_requests
|
|
||||||
.remove(&request_id)
|
|
||||||
.map(|batch_id| (batch_id, None)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn backfill_sync_sidecar_response(
|
/// Received a blocks by range response for a request that couples blocks and blobs.
|
||||||
|
pub fn backfill_sync_block_and_blob_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
request_id: Id,
|
request_id: Id,
|
||||||
maybe_sidecar: Option<Arc<BlobsSidecar<T::EthSpec>>>,
|
block_or_blob: BlockOrBlob<T::EthSpec>,
|
||||||
) -> Option<(BatchId, Option<BlockWrapper<T::EthSpec>>)> {
|
) -> Option<(BatchId, Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>)> {
|
||||||
match self.backfill_sidecar_pair_requests.entry(request_id) {
|
match self.backfill_sidecar_pair_requests.entry(request_id) {
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
let (batch_id, info) = entry.get_mut();
|
let (_, info) = entry.get_mut();
|
||||||
let batch_id = batch_id.clone();
|
match block_or_blob {
|
||||||
info.add_sidecar_response(maybe_sidecar);
|
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
|
||||||
let maybe_block = info
|
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
|
||||||
.pop_response()
|
}
|
||||||
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair });
|
if info.is_finished() {
|
||||||
if info.is_finished() {
|
// If the request is finished, unqueue everything
|
||||||
entry.remove();
|
let (batch_id, info) = entry.remove();
|
||||||
|
Some((batch_id, info.into_responses()))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
}
|
}
|
||||||
Some((batch_id, maybe_block))
|
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
}
|
}
|
||||||
|
@ -686,13 +686,10 @@ mod tests {
|
|||||||
// add some peers
|
// add some peers
|
||||||
let (peer1, local_info, head_info) = rig.head_peer();
|
let (peer1, local_info, head_info) = rig.head_peer();
|
||||||
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
|
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
|
||||||
let ((chain1, batch1, _), id1) = match rig.grab_request(&peer1).0 {
|
let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 {
|
||||||
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => (
|
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
|
||||||
rig.cx
|
(rig.cx.range_sync_response(id, true).unwrap(), id)
|
||||||
.range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock)
|
}
|
||||||
.unwrap(),
|
|
||||||
id,
|
|
||||||
),
|
|
||||||
other => panic!("unexpected request {:?}", other),
|
other => panic!("unexpected request {:?}", other),
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -708,13 +705,10 @@ mod tests {
|
|||||||
// while the ee is offline, more peers might arrive. Add a new finalized peer.
|
// while the ee is offline, more peers might arrive. Add a new finalized peer.
|
||||||
let (peer2, local_info, finalized_info) = rig.finalized_peer();
|
let (peer2, local_info, finalized_info) = rig.finalized_peer();
|
||||||
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
|
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
|
||||||
let ((chain2, batch2, _), id2) = match rig.grab_request(&peer2).0 {
|
let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 {
|
||||||
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => (
|
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
|
||||||
rig.cx
|
(rig.cx.range_sync_response(id, true).unwrap(), id)
|
||||||
.range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock)
|
}
|
||||||
.unwrap(),
|
|
||||||
id,
|
|
||||||
),
|
|
||||||
other => panic!("unexpected request {:?}", other),
|
other => panic!("unexpected request {:?}", other),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1,13 +1,17 @@
|
|||||||
use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot};
|
use crate::test_utils::TestRandom;
|
||||||
|
use crate::{Blob, EthSpec, Hash256, SignedBeaconBlock, SignedRoot, Slot};
|
||||||
use kzg::KzgProof;
|
use kzg::KzgProof;
|
||||||
use serde_derive::{Deserialize, Serialize};
|
use serde_derive::{Deserialize, Serialize};
|
||||||
use ssz::Encode;
|
use ssz::Encode;
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use ssz_types::VariableList;
|
use ssz_types::VariableList;
|
||||||
|
use test_random_derive::TestRandom;
|
||||||
use tree_hash_derive::TreeHash;
|
use tree_hash_derive::TreeHash;
|
||||||
|
|
||||||
#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))]
|
#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))]
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default)]
|
#[derive(
|
||||||
|
Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default, TestRandom,
|
||||||
|
)]
|
||||||
#[serde(bound = "T: EthSpec")]
|
#[serde(bound = "T: EthSpec")]
|
||||||
pub struct BlobsSidecar<T: EthSpec> {
|
pub struct BlobsSidecar<T: EthSpec> {
|
||||||
pub beacon_block_root: Hash256,
|
pub beacon_block_root: Hash256,
|
||||||
@ -23,6 +27,7 @@ impl<T: EthSpec> BlobsSidecar<T> {
|
|||||||
pub fn empty() -> Self {
|
pub fn empty() -> Self {
|
||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::integer_arithmetic)]
|
#[allow(clippy::integer_arithmetic)]
|
||||||
pub fn max_size() -> usize {
|
pub fn max_size() -> usize {
|
||||||
// Fixed part
|
// Fixed part
|
||||||
|
Loading…
Reference in New Issue
Block a user