Merge pull request #3829 from divagant-martian/handle-no-blob-range-response

Handle peers sending no blob when the blob is empty in range responses
This commit is contained in:
realbigsean 2022-12-23 10:15:30 -05:00 committed by GitHub
commit 4d50fa36bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 279 additions and 268 deletions

View File

@ -0,0 +1,79 @@
use std::{collections::VecDeque, sync::Arc};
use types::{
signed_block_and_blobs::BlockWrapper, BlobsSidecar, EthSpec, SignedBeaconBlock,
SignedBeaconBlockAndBlobsSidecar,
};
#[derive(Debug, Default)]
pub struct BlockBlobRequestInfo<T: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar.
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>,
/// Sidecars we have received awaiting for their corresponding block.
accumulated_sidecars: VecDeque<Arc<BlobsSidecar<T>>>,
/// Whether the individual RPC request for blocks is finished or not.
is_blocks_stream_terminated: bool,
/// Whether the individual RPC request for sidecars is finished or not.
is_sidecars_stream_terminated: bool,
}
impl<T: EthSpec> BlockBlobRequestInfo<T> {
pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) {
match maybe_block {
Some(block) => self.accumulated_blocks.push_back(block),
None => self.is_blocks_stream_terminated = true,
}
}
pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobsSidecar<T>>>) {
match maybe_sidecar {
Some(sidecar) => self.accumulated_sidecars.push_back(sidecar),
None => self.is_sidecars_stream_terminated = true,
}
}
pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> {
let BlockBlobRequestInfo {
accumulated_blocks,
mut accumulated_sidecars,
..
} = self;
// ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty
// included) for a skipped slot is not permitted.
let pairs = accumulated_blocks
.into_iter()
.map(|beacon_block| {
if accumulated_sidecars
.front()
.map(|sidecar| sidecar.beacon_block_slot == beacon_block.slot())
.unwrap_or(false)
{
let blobs_sidecar =
accumulated_sidecars.pop_front().ok_or("missing sidecar")?;
Ok(BlockWrapper::BlockAndBlob {
block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar {
beacon_block,
blobs_sidecar,
},
})
} else {
Ok(BlockWrapper::Block {
block: beacon_block,
})
}
})
.collect::<Result<Vec<_>, _>>();
// if accumulated sidecars is not empty, throw an error.
if !accumulated_sidecars.is_empty() {
return Err("Received more sidecars than blocks");
}
pairs
}
pub fn is_finished(&self) -> bool {
self.is_blocks_stream_terminated && self.is_sidecars_stream_terminated
}
}

View File

@ -35,7 +35,7 @@
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::BlockLookups;
use super::network_context::SyncNetworkContext;
use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
@ -45,11 +45,11 @@ use crate::sync::range_sync::ExpectedBatchTy;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::rpc::RPCError;
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId};
use slog::{crit, debug, error, info, trace, Logger};
use slog::{crit, debug, error, info, trace, warn, Logger};
use std::boxed::Box;
use std::ops::Sub;
use std::sync::Arc;
@ -746,17 +746,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
&mut self.network,
),
RequestId::BackFillSync { id } => {
if let Some((batch_id, block)) = self.network.backfill_sync_block_response(
id,
beacon_block,
ExpectedBatchTy::OnlyBlock,
) {
let is_stream_terminator = beacon_block.is_none();
if let Some(batch_id) = self
.network
.backfill_sync_only_blocks_response(id, is_stream_terminator)
{
match self.backfill_sync.on_block_response(
&mut self.network,
batch_id,
&peer_id,
id,
block,
beacon_block.map(|block| BlockWrapper::Block { block }),
) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
@ -769,11 +769,52 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
RequestId::RangeSync { id } => {
if let Some((chain_id, batch_id, block)) = self.network.range_sync_block_response(
let is_stream_terminator = beacon_block.is_none();
if let Some((chain_id, batch_id)) = self
.network
.range_sync_block_response(id, is_stream_terminator)
{
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
chain_id,
batch_id,
id,
beacon_block,
ExpectedBatchTy::OnlyBlock,
beacon_block.map(|block| BlockWrapper::Block { block }),
);
self.update_sync_state();
}
}
RequestId::BackFillSidecarPair { id } => {
self.block_blob_backfill_response(id, peer_id, beacon_block.into())
}
RequestId::RangeSidecarPair { id } => {
self.block_blob_range_response(id, peer_id, beacon_block.into())
}
}
}
/// Handles receiving a response for a range sync request that should have both blocks and
/// blobs.
fn block_blob_range_response(
&mut self,
id: Id,
peer_id: PeerId,
block_or_blob: BlockOrBlob<T::EthSpec>,
) {
if let Some((chain_id, batch_id, block_responses)) = self
.network
.range_sync_block_and_blob_response(id, block_or_blob)
{
match block_responses {
Ok(blocks) => {
for block in blocks
.into_iter()
.map(|block| Some(block))
// chain the stream terminator
.chain(vec![None])
{
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
@ -785,13 +826,41 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.update_sync_state();
}
}
Err(e) => {
// inform range that the request needs to be treated as failed
// With time we will want to downgrade this log
warn!(
self.log, "Blocks and blobs request for range received invalid data";
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
);
// TODO: penalize the peer for being a bad boy
let id = RequestId::RangeSidecarPair { id };
self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
}
}
}
}
RequestId::BackFillSidecarPair { id } => {
if let Some((batch_id, block)) = self.network.backfill_sync_block_response(
id,
beacon_block,
ExpectedBatchTy::OnlyBlockBlobs,
/// Handles receiving a response for a Backfill sync request that should have both blocks and
/// blobs.
fn block_blob_backfill_response(
&mut self,
id: Id,
peer_id: PeerId,
block_or_blob: BlockOrBlob<T::EthSpec>,
) {
if let Some((batch_id, block_responses)) = self
.network
.backfill_sync_block_and_blob_response(id, block_or_blob)
{
match block_responses {
Ok(blocks) => {
for block in blocks
.into_iter()
.map(|block| Some(block))
// chain the stream terminator
.chain(vec![None])
{
match self.backfill_sync.on_block_response(
&mut self.network,
batch_id,
@ -809,21 +878,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
}
RequestId::RangeSidecarPair { id } => {
if let Some((chain_id, batch_id, block)) = self.network.range_sync_block_response(
id,
beacon_block,
ExpectedBatchTy::OnlyBlockBlobs,
) {
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
chain_id,
batch_id,
id,
block,
Err(e) => {
// inform backfill that the request needs to be treated as failed
// With time we will want to downgrade this log
warn!(
self.log, "Blocks and blobs request for backfill received invalid data";
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
);
self.update_sync_state();
// TODO: penalize the peer for being a bad boy
let id = RequestId::BackFillSidecarPair { id };
self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
}
}
}
@ -844,44 +908,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
unreachable!("An only blocks request does not receive sidecars")
}
RequestId::BackFillSidecarPair { id } => {
if let Some((batch_id, block)) = self
.network
.backfill_sync_sidecar_response(id, maybe_sidecar)
{
match self.backfill_sync.on_block_response(
&mut self.network,
batch_id,
&peer_id,
id,
block,
) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
Err(_error) => {
// The backfill sync has failed, errors are reported
// within.
self.update_sync_state();
}
}
}
self.block_blob_backfill_response(id, peer_id, maybe_sidecar.into())
}
RequestId::RangeSync { .. } => {
unreachable!("And only blocks range request does not receive sidecars")
unreachable!("Only-blocks range requests don't receive sidecars")
}
RequestId::RangeSidecarPair { id } => {
if let Some((chain_id, batch_id, block)) =
self.network.range_sync_sidecar_response(id, maybe_sidecar)
{
self.range_sync.blocks_by_range_response(
&mut self.network,
peer_id,
chain_id,
batch_id,
id,
block,
);
self.update_sync_state();
}
self.block_blob_range_response(id, peer_id, maybe_sidecar.into())
}
}
}

View File

@ -3,6 +3,7 @@
//! Stores the various syncing methods for the beacon chain.
mod backfill_sync;
mod block_lookups;
mod block_sidecar_coupling;
pub mod manager;
mod network_context;
mod peer_sync_info;

View File

@ -1,6 +1,7 @@
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
//! channel and stores a global RPC ID to perform requests.
use super::block_sidecar_coupling::BlockBlobRequestInfo;
use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ChainId, ExpectedBatchTy};
use crate::beacon_processor::WorkEvent;
@ -13,59 +14,11 @@ use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
use slog::{debug, trace, warn};
use slot_clock::SlotClock;
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{
BlobsSidecar, ChainSpec, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
};
#[derive(Debug, Default)]
struct BlockBlobRequestInfo<T: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar.
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<T>>>,
/// Sidecars we have received awaiting for their corresponding block.
accumulated_sidecars: VecDeque<Arc<BlobsSidecar<T>>>,
/// Whether the individual RPC request for blocks is finished or not.
is_blocks_rpc_finished: bool,
/// Whether the individual RPC request for sidecars is finished or not.
is_sidecar_rpc_finished: bool,
}
impl<T: EthSpec> BlockBlobRequestInfo<T> {
pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) {
match maybe_block {
Some(block) => self.accumulated_blocks.push_back(block),
None => self.is_blocks_rpc_finished = true,
}
}
pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobsSidecar<T>>>) {
match maybe_sidecar {
Some(sidecar) => self.accumulated_sidecars.push_back(sidecar),
None => self.is_sidecar_rpc_finished = true,
}
}
pub fn pop_response(&mut self) -> Option<SignedBeaconBlockAndBlobsSidecar<T>> {
if !self.accumulated_blocks.is_empty() && !self.accumulated_sidecars.is_empty() {
let beacon_block = self.accumulated_blocks.pop_front().expect("non empty");
let blobs_sidecar = self.accumulated_sidecars.pop_front().expect("non empty");
return Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block,
blobs_sidecar,
});
}
None
}
pub fn is_finished(&self) -> bool {
self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished
}
}
use types::{BlobsSidecar, EthSpec, SignedBeaconBlock};
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
pub struct SyncNetworkContext<T: BeaconChainTypes> {
@ -104,6 +57,24 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
log: slog::Logger,
}
/// Small enumeration to make dealing with block and blob requests easier.
pub enum BlockOrBlob<T: EthSpec> {
Block(Option<Arc<SignedBeaconBlock<T>>>),
Blob(Option<Arc<BlobsSidecar<T>>>),
}
impl<T: EthSpec> From<Option<Arc<SignedBeaconBlock<T>>>> for BlockOrBlob<T> {
fn from(block: Option<Arc<SignedBeaconBlock<T>>>) -> Self {
BlockOrBlob::Block(block)
}
}
impl<T: EthSpec> From<Option<Arc<BlobsSidecar<T>>>> for BlockOrBlob<T> {
fn from(blob: Option<Arc<BlobsSidecar<T>>>) -> Self {
BlockOrBlob::Blob(blob)
}
}
impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn new(
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
@ -300,91 +271,43 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}
/// Received a blocks by range response.
/// Response for a request that is only for blocks.
pub fn range_sync_block_response(
&mut self,
request_id: Id,
maybe_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
batch_type: ExpectedBatchTy,
) -> Option<(ChainId, BatchId, Option<BlockWrapper<T::EthSpec>>)> {
match batch_type {
ExpectedBatchTy::OnlyBlockBlobs => {
match self.range_sidecar_pair_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (chain_id, batch_id, info) = entry.get_mut();
let chain_id = chain_id.clone();
let batch_id = batch_id.clone();
let stream_terminator = maybe_block.is_none();
info.add_block_response(maybe_block);
let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| {
BlockWrapper::BlockAndBlob { block_sidecar_pair }
});
if stream_terminator && !info.is_finished() {
return None;
}
if !stream_terminator && maybe_block_wrapped.is_none() {
return None;
}
if info.is_finished() {
entry.remove();
}
Some((chain_id, batch_id, maybe_block_wrapped))
}
Entry::Vacant(_) => None,
}
}
ExpectedBatchTy::OnlyBlock => {
// if the request is just for blocks then it can be removed on a stream termination
match maybe_block {
Some(block) => {
self.range_requests
.get(&request_id)
.cloned()
.map(|(chain_id, batch_id)| {
(chain_id, batch_id, Some(BlockWrapper::Block { block }))
})
}
None => self
.range_requests
.remove(&request_id)
.map(|(chain_id, batch_id)| (chain_id, batch_id, None)),
}
}
is_stream_terminator: bool,
) -> Option<(ChainId, BatchId)> {
if is_stream_terminator {
self.range_requests.remove(&request_id)
} else {
self.range_requests.get(&request_id).copied()
}
}
pub fn range_sync_sidecar_response(
/// Received a blocks by range response for a request that couples blocks and blobs.
pub fn range_sync_block_and_blob_response(
&mut self,
request_id: Id,
maybe_sidecar: Option<Arc<BlobsSidecar<T::EthSpec>>>,
) -> Option<(ChainId, BatchId, Option<BlockWrapper<T::EthSpec>>)> {
block_or_blob: BlockOrBlob<T::EthSpec>,
) -> Option<(
ChainId,
BatchId,
Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>,
)> {
match self.range_sidecar_pair_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (chain_id, batch_id, info) = entry.get_mut();
let chain_id = chain_id.clone();
let batch_id = batch_id.clone();
let stream_terminator = maybe_sidecar.is_none();
info.add_sidecar_response(maybe_sidecar);
let maybe_block = info
.pop_response()
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair });
if stream_terminator && !info.is_finished() {
return None;
let (_, _, info) = entry.get_mut();
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if !stream_terminator && maybe_block.is_none() {
return None;
}
if info.is_finished() {
entry.remove();
// If the request is finished, dequeue everything
let (chain_id, batch_id, info) = entry.remove();
Some((chain_id, batch_id, info.into_responses()))
} else {
None
}
Some((chain_id, batch_id, maybe_block))
}
Entry::Vacant(_) => None,
}
@ -418,65 +341,41 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}
/// Received a blocks by range response.
pub fn backfill_sync_block_response(
/// Response for a request that is only for blocks.
pub fn backfill_sync_only_blocks_response(
&mut self,
request_id: Id,
maybe_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
batch_type: ExpectedBatchTy,
) -> Option<(BatchId, Option<BlockWrapper<T::EthSpec>>)> {
match batch_type {
ExpectedBatchTy::OnlyBlockBlobs => {
match self.backfill_sidecar_pair_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (batch_id, info) = entry.get_mut();
let batch_id = batch_id.clone();
info.add_block_response(maybe_block);
let maybe_block = info.pop_response().map(|block_sidecar_pair| {
BlockWrapper::BlockAndBlob { block_sidecar_pair }
});
if info.is_finished() {
entry.remove();
}
Some((batch_id, maybe_block))
}
Entry::Vacant(_) => None,
}
}
ExpectedBatchTy::OnlyBlock => {
// if the request is just for blocks then it can be removed on a stream termination
match maybe_block {
Some(block) => self
.backfill_requests
.get(&request_id)
.cloned()
.map(|batch_id| (batch_id, Some(BlockWrapper::Block { block }))),
None => self
.backfill_requests
is_stream_terminator: bool,
) -> Option<BatchId> {
if is_stream_terminator {
self.backfill_requests
.remove(&request_id)
.map(|batch_id| (batch_id, None)),
}
}
.map(|batch_id| batch_id)
} else {
self.backfill_requests.get(&request_id).copied()
}
}
pub fn backfill_sync_sidecar_response(
/// Received a blocks by range response for a request that couples blocks and blobs.
pub fn backfill_sync_block_and_blob_response(
&mut self,
request_id: Id,
maybe_sidecar: Option<Arc<BlobsSidecar<T::EthSpec>>>,
) -> Option<(BatchId, Option<BlockWrapper<T::EthSpec>>)> {
block_or_blob: BlockOrBlob<T::EthSpec>,
) -> Option<(BatchId, Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>)> {
match self.backfill_sidecar_pair_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (batch_id, info) = entry.get_mut();
let batch_id = batch_id.clone();
info.add_sidecar_response(maybe_sidecar);
let maybe_block = info
.pop_response()
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair });
if info.is_finished() {
entry.remove();
let (_, info) = entry.get_mut();
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
let (batch_id, info) = entry.remove();
Some((batch_id, info.into_responses()))
} else {
None
}
Some((batch_id, maybe_block))
}
Entry::Vacant(_) => None,
}

View File

@ -686,13 +686,10 @@ mod tests {
// add some peers
let (peer1, local_info, head_info) = rig.head_peer();
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
let ((chain1, batch1, _), id1) = match rig.grab_request(&peer1).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => (
rig.cx
.range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock)
.unwrap(),
id,
),
let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id)
}
other => panic!("unexpected request {:?}", other),
};
@ -708,13 +705,10 @@ mod tests {
// while the ee is offline, more peers might arrive. Add a new finalized peer.
let (peer2, local_info, finalized_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
let ((chain2, batch2, _), id2) = match rig.grab_request(&peer2).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => (
rig.cx
.range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock)
.unwrap(),
id,
),
let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id)
}
other => panic!("unexpected request {:?}", other),
};

View File

@ -1,13 +1,17 @@
use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot};
use crate::test_utils::TestRandom;
use crate::{Blob, EthSpec, Hash256, SignedBeaconBlock, SignedRoot, Slot};
use kzg::KzgProof;
use serde_derive::{Deserialize, Serialize};
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use test_random_derive::TestRandom;
use tree_hash_derive::TreeHash;
#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))]
#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default)]
#[derive(
Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default, TestRandom,
)]
#[serde(bound = "T: EthSpec")]
pub struct BlobsSidecar<T: EthSpec> {
pub beacon_block_root: Hash256,
@ -23,6 +27,7 @@ impl<T: EthSpec> BlobsSidecar<T> {
pub fn empty() -> Self {
Self::default()
}
#[allow(clippy::integer_arithmetic)]
pub fn max_size() -> usize {
// Fixed part