Refactor deneb networking (#4561)

* Revert "fix merge"

This reverts commit 405e95b0ce.

* refactor deneb block processing

* cargo fmt

* make block and blob single lookups generic

* get tests compiling

* clean up everything add child component, fix peer scoring and retry logic

* smol cleanup and a bugfix

* remove ParentLookupReqId

* Update beacon_node/network/src/sync/manager.rs

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>

* Update beacon_node/network/src/sync/manager.rs

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>

* update unreachables to crits

* Revert "update unreachables to crits"

This reverts commit 064bf64dff86b3229316aeed0431c3f4251571a5.

* update make request/build request to make more sense

* pr feedback

* Update beacon_node/network/src/sync/block_lookups/mod.rs

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>

* Update beacon_node/network/src/sync/block_lookups/mod.rs

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>

* more pr feedback, fix availability check error handling

* improve block component processed log

---------

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
realbigsean 2023-08-07 14:16:21 -04:00 committed by GitHub
parent c8ea3e1c86
commit 731b7e7af5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 2299 additions and 2054 deletions

View File

@ -6,8 +6,10 @@ use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::{data_availability_checker, GossipVerifiedBlock, PayloadVerificationOutcome}; use crate::{data_availability_checker, GossipVerifiedBlock, PayloadVerificationOutcome};
use derivative::Derivative; use derivative::Derivative;
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use state_processing::ConsensusContext; use state_processing::ConsensusContext;
use std::sync::Arc; use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{ use types::{
blob_sidecar::BlobIdentifier, ssz_tagged_beacon_state, ssz_tagged_signed_beacon_block, blob_sidecar::BlobIdentifier, ssz_tagged_beacon_state, ssz_tagged_signed_beacon_block,
ssz_tagged_signed_beacon_block_arc, ssz_tagged_signed_beacon_block_arc,
@ -73,6 +75,22 @@ impl<E: EthSpec> RpcBlock<E> {
Ok(Self { block: inner }) Ok(Self { block: inner })
} }
pub fn new_from_fixed(
block: Arc<SignedBeaconBlock<E>>,
blobs: FixedBlobSidecarList<E>,
) -> Result<Self, AvailabilityCheckError> {
let filtered = blobs
.into_iter()
.filter_map(|b| b.clone())
.collect::<Vec<_>>();
let blobs = if filtered.is_empty() {
None
} else {
Some(VariableList::from(filtered))
};
Self::new(block, blobs)
}
pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<E>>, Option<BlobSidecarList<E>>) { pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<E>>, Option<BlobSidecarList<E>>) {
match self.block { match self.block {
RpcBlockInner::Block(block) => (block, None), RpcBlockInner::Block(block) => (block, None),

View File

@ -41,7 +41,6 @@ pub enum AvailabilityCheckError {
num_blobs: usize, num_blobs: usize,
}, },
MissingBlobs, MissingBlobs,
TxKzgCommitmentMismatch(String),
KzgCommitmentMismatch { KzgCommitmentMismatch {
blob_index: u64, blob_index: u64,
}, },

View File

@ -1169,7 +1169,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
AvailabilityCheckError::Kzg(_) AvailabilityCheckError::Kzg(_)
| AvailabilityCheckError::KzgVerificationFailed | AvailabilityCheckError::KzgVerificationFailed
| AvailabilityCheckError::NumBlobsMismatch { .. } | AvailabilityCheckError::NumBlobsMismatch { .. }
| AvailabilityCheckError::TxKzgCommitmentMismatch(_)
| AvailabilityCheckError::BlobIndexInvalid(_) | AvailabilityCheckError::BlobIndexInvalid(_)
| AvailabilityCheckError::UnorderedBlobs { .. } | AvailabilityCheckError::UnorderedBlobs { .. }
| AvailabilityCheckError::BlockBlobRootMismatch { .. } | AvailabilityCheckError::BlockBlobRootMismatch { .. }

View File

@ -434,6 +434,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) -> Result<(), Error<T::EthSpec>> { ) -> Result<(), Error<T::EthSpec>> {
let blob_count = blobs.iter().filter(|b| b.is_some()).count();
if blob_count == 0 {
return Ok(());
}
let process_fn = self.clone().generate_rpc_blobs_process_fn( let process_fn = self.clone().generate_rpc_blobs_process_fn(
block_root, block_root,
blobs, blobs,

View File

@ -1,6 +1,5 @@
use crate::metrics; use crate::metrics;
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
use crate::sync::manager::ResponseType;
use crate::sync::BatchProcessResult; use crate::sync::BatchProcessResult;
use crate::sync::{ use crate::sync::{
manager::{BlockProcessType, SyncMessage}, manager::{BlockProcessType, SyncMessage},
@ -96,7 +95,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.send_sync_message(SyncMessage::BlockComponentProcessed { self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type, process_type,
result: crate::sync::manager::BlockProcessingResult::Ignored, result: crate::sync::manager::BlockProcessingResult::Ignored,
response_type: crate::sync::manager::ResponseType::Block,
}); });
}; };
(process_fn, Box::new(ignore_fn)) (process_fn, Box::new(ignore_fn))
@ -249,7 +247,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.send_sync_message(SyncMessage::BlockComponentProcessed { self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type, process_type,
result: result.into(), result: result.into(),
response_type: ResponseType::Block,
}); });
// Drop the handle to remove the entry from the cache // Drop the handle to remove the entry from the cache
@ -301,7 +298,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.send_sync_message(SyncMessage::BlockComponentProcessed { self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type, process_type,
result: result.into(), result: result.into(),
response_type: ResponseType::Blob,
}); });
} }

View File

@ -21,7 +21,7 @@ use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
}; };
use logging::TimeLatch; use logging::TimeLatch;
use slog::{debug, o, trace}; use slog::{crit, debug, o, trace};
use slog::{error, warn}; use slog::{error, warn};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
@ -482,15 +482,22 @@ impl<T: BeaconChainTypes> Router<T> {
) { ) {
let request_id = match request_id { let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id { RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { SyncId::SingleBlock { .. }
unreachable!("Block lookups do not request BBRange requests") | SyncId::SingleBlob { .. }
| SyncId::ParentLookup { .. }
| SyncId::ParentLookupBlob { .. } => {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
return;
} }
id @ (SyncId::BackFillBlocks { .. } id @ (SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. } | SyncId::RangeBlocks { .. }
| SyncId::BackFillBlockAndBlobs { .. } | SyncId::BackFillBlockAndBlobs { .. }
| SyncId::RangeBlockAndBlobs { .. }) => id, | SyncId::RangeBlockAndBlobs { .. }) => id,
}, },
RequestId::Router => unreachable!("All BBRange requests belong to sync"), RequestId::Router => {
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
return;
}
}; };
trace!( trace!(
@ -548,10 +555,18 @@ impl<T: BeaconChainTypes> Router<T> {
| SyncId::RangeBlocks { .. } | SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. } | SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => { | SyncId::BackFillBlockAndBlobs { .. } => {
unreachable!("Batch syncing do not request BBRoot requests") crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. } => {
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
return;
} }
}, },
RequestId::Router => unreachable!("All BBRoot requests belong to sync"), RequestId::Router => {
crit!(self.log, "All BBRoot requests belong to sync"; "peer_id" => %peer_id);
return;
}
}; };
trace!( trace!(
@ -576,15 +591,23 @@ impl<T: BeaconChainTypes> Router<T> {
) { ) {
let request_id = match request_id { let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id { RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, id @ (SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. }) => id,
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
SyncId::BackFillBlocks { .. } SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. } | SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. } | SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => { | SyncId::BackFillBlockAndBlobs { .. } => {
unreachable!("Batch syncing does not request BBRoot requests") crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
return;
} }
}, },
RequestId::Router => unreachable!("All BlobsByRoot requests belong to sync"), RequestId::Router => {
crit!(self.log, "All BlobsByRoot requests belong to sync"; "peer_id" => %peer_id);
return;
}
}; };
trace!( trace!(

View File

@ -0,0 +1,473 @@
use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE;
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockLookups, BlockRequestState, PeerShouldHave,
SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::CachedChildComponents;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{get_block_root, BeaconChainTypes};
use lighthouse_network::rpc::methods::BlobsByRootRequest;
use lighthouse_network::rpc::BlocksByRootRequest;
use lighthouse_network::PeerId;
use rand::prelude::IteratorRandom;
use ssz_types::VariableList;
use std::ops::IndexMut;
use std::sync::Arc;
use std::time::Duration;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock};
#[derive(Debug, Copy, Clone)]
pub enum ResponseType {
Block,
Blob,
}
#[derive(Debug, Copy, Clone)]
pub enum LookupType {
Current,
Parent,
}
/// This trait helps differentiate `SingleBlockLookup`s from `ParentLookup`s .This is useful in
/// ensuring requests and responses are handled separately and enables us to use different failure
/// tolerances for each, while re-using the same basic request and retry logic.
pub trait Lookup {
const MAX_ATTEMPTS: u8;
fn lookup_type() -> LookupType;
fn max_attempts() -> u8 {
Self::MAX_ATTEMPTS
}
}
/// A `Lookup` that is a part of a `ParentLookup`.
pub struct Parent;
impl Lookup for Parent {
const MAX_ATTEMPTS: u8 = PARENT_FAIL_TOLERANCE;
fn lookup_type() -> LookupType {
LookupType::Parent
}
}
/// A `Lookup` that part of a single block lookup.
pub struct Current;
impl Lookup for Current {
const MAX_ATTEMPTS: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS;
fn lookup_type() -> LookupType {
LookupType::Current
}
}
/// This trait unifies common single block lookup functionality across blocks and blobs. This
/// includes making requests, verifying responses, and handling processing results. A
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is
/// implemented for each.
///
/// The use of the `ResponseType` associated type gives us a degree of type
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding
/// state.
pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/// The type of the request .
type RequestType;
/// A block or blob response.
type ResponseType;
/// The type created after validation.
type VerifiedResponseType: Clone;
/// We convert a `VerifiedResponseType` to this type prior to sending it to the beacon processor.
type ReconstructedResponseType;
/* Request building methods */
/// Construct a new request.
fn build_request(&mut self) -> Result<(PeerShouldHave, Self::RequestType), LookupRequestError> {
// Verify and construct request.
self.too_many_attempts()?;
let peer = self.get_peer()?;
let request = self.new_request();
Ok((peer, request))
}
/// Construct a new request and send it.
fn build_request_and_send(
&mut self,
id: Id,
already_downloaded: bool,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Check if request is necessary.
if already_downloaded || !matches!(self.get_state().state, State::AwaitingDownload) {
return Ok(());
}
// Construct request.
let (peer_id, request) = self.build_request()?;
// Update request state.
self.get_state_mut().state = State::Downloading { peer_id };
self.get_state_mut().req_counter += 1;
// Make request
let id = SingleLookupReqId {
id,
req_counter: self.get_state().req_counter,
};
Self::make_request(id, peer_id.to_peer_id(), request, cx)
}
/// Verify the current request has not exceeded the maximum number of attempts.
fn too_many_attempts(&self) -> Result<(), LookupRequestError> {
let max_attempts = L::max_attempts();
let request_state = self.get_state();
if request_state.failed_attempts() >= max_attempts {
let cannot_process =
request_state.failed_processing >= request_state.failed_downloading;
Err(LookupRequestError::TooManyAttempts { cannot_process })
} else {
Ok(())
}
}
/// Get the next peer to request. Draws from the set of peers we think should have both the
/// block and blob first. If that fails, we draw from the set of peers that may have either.
fn get_peer(&mut self) -> Result<PeerShouldHave, LookupRequestError> {
let request_state = self.get_state_mut();
let available_peer_opt = request_state
.available_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()
.map(PeerShouldHave::BlockAndBlobs);
let Some(peer_id) = available_peer_opt.or_else(||request_state
.potential_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()
.map(PeerShouldHave::Neither)) else {
return Err(LookupRequestError::NoPeers);
};
request_state.used_peers.insert(peer_id.to_peer_id());
Ok(peer_id)
}
/// Initialize `Self::RequestType`.
fn new_request(&self) -> Self::RequestType;
/// Send the request to the network service.
fn make_request(
id: SingleLookupReqId,
peer_id: PeerId,
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError>;
/* Response handling methods */
/// Verify the response is valid based on what we requested.
fn verify_response(
&mut self,
expected_block_root: Hash256,
response: Option<Self::ResponseType>,
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError> {
let request_state = self.get_state_mut();
match request_state.state {
State::AwaitingDownload => {
request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlocksReturned)
}
State::Downloading { peer_id } => {
self.verify_response_inner(expected_block_root, response, peer_id)
}
State::Processing { peer_id: _ } => match response {
Some(_) => {
// We sent the block for processing and received an extra block.
request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlocksReturned)
}
None => {
// This is simply the stream termination and we are already processing the
// block
Ok(None)
}
},
}
}
/// The response verification unique to block or blobs.
fn verify_response_inner(
&mut self,
expected_block_root: Hash256,
response: Option<Self::ResponseType>,
peer_id: PeerShouldHave,
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError>;
/// A getter for the parent root of the response. Returns an `Option` because we won't know
/// the blob parent if we don't end up getting any blobs in the response.
fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option<Hash256>;
/// Caches the verified response in the lookup if necessary. This is only necessary for lookups
/// triggered by `UnknownParent` errors.
fn add_to_child_components(
verified_response: Self::VerifiedResponseType,
components: &mut CachedChildComponents<T::EthSpec>,
);
/// Convert a verified response to the type we send to the beacon processor.
fn verified_to_reconstructed(
verified: Self::VerifiedResponseType,
) -> Self::ReconstructedResponseType;
/// Send the response to the beacon processor.
fn send_reconstructed_for_processing(
id: Id,
bl: &BlockLookups<T>,
block_root: Hash256,
verified: Self::ReconstructedResponseType,
duration: Duration,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError>;
/// Remove the peer from the lookup if it is useless.
fn remove_if_useless(&mut self, peer: &PeerId) {
self.get_state_mut().remove_peer_if_useless(peer)
}
/// Register a failure to process the block or blob.
fn register_failure_downloading(&mut self) {
self.get_state_mut().register_failure_downloading()
}
/* Utility methods */
/// Returns the `ResponseType` associated with this trait implementation. Useful in logging.
fn response_type() -> ResponseType;
/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self;
/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
fn get_state(&self) -> &SingleLookupRequestState;
/// A getter for a mutable reference to the SingleLookupRequestState associated with this trait.
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState;
}
impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L> {
type RequestType = BlocksByRootRequest;
type ResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
type ReconstructedResponseType = RpcBlock<T::EthSpec>;
fn new_request(&self) -> BlocksByRootRequest {
BlocksByRootRequest::new(VariableList::from(vec![self.requested_block_root]))
}
fn make_request(
id: SingleLookupReqId,
peer_id: PeerId,
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.block_lookup_request(id, peer_id, request, L::lookup_type())
.map_err(LookupRequestError::SendFailed)
}
fn verify_response_inner(
&mut self,
expected_block_root: Hash256,
response: Option<Self::ResponseType>,
peer_id: PeerShouldHave,
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, LookupVerifyError> {
match response {
Some(block) => {
// Compute the block root using this specific function so that we can get timing
// metrics.
let block_root = get_block_root(&block);
if block_root != expected_block_root {
// return an error and drop the block
// NOTE: we take this is as a download failure to prevent counting the
// attempt as a chain failure, but simply a peer failure.
self.state.register_failure_downloading();
Err(LookupVerifyError::RootMismatch)
} else {
// Return the block for processing.
self.state.state = State::Processing { peer_id };
Ok(Some(block))
}
}
None => {
if peer_id.should_have_block() {
self.state.register_failure_downloading();
Err(LookupVerifyError::NoBlockReturned)
} else {
self.state.state = State::AwaitingDownload;
Err(LookupVerifyError::BenignFailure)
}
}
}
}
fn get_parent_root(verified_response: &Arc<SignedBeaconBlock<T::EthSpec>>) -> Option<Hash256> {
Some(verified_response.parent_root())
}
fn add_to_child_components(
verified_response: Arc<SignedBeaconBlock<T::EthSpec>>,
components: &mut CachedChildComponents<T::EthSpec>,
) {
components.add_cached_child_block(verified_response);
}
fn verified_to_reconstructed(
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> RpcBlock<T::EthSpec> {
RpcBlock::new_without_blobs(block)
}
fn send_reconstructed_for_processing(
id: Id,
bl: &BlockLookups<T>,
block_root: Hash256,
constructed: RpcBlock<T::EthSpec>,
duration: Duration,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
bl.send_block_for_processing(
block_root,
constructed,
duration,
BlockProcessType::SingleBlock { id },
cx,
)
}
fn response_type() -> ResponseType {
ResponseType::Block
}
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self {
&mut request.block_request_state
}
fn get_state(&self) -> &SingleLookupRequestState {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState {
&mut self.state
}
}
impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L, T::EthSpec> {
type RequestType = BlobsByRootRequest;
type ResponseType = Arc<BlobSidecar<T::EthSpec>>;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;
fn new_request(&self) -> BlobsByRootRequest {
BlobsByRootRequest {
blob_ids: VariableList::from(self.requested_ids.clone()),
}
}
fn make_request(
id: SingleLookupReqId,
peer_id: PeerId,
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.blob_lookup_request(id, peer_id, request, L::lookup_type())
.map_err(LookupRequestError::SendFailed)
}
fn verify_response_inner(
&mut self,
_expected_block_root: Hash256,
blob: Option<Self::ResponseType>,
peer_id: PeerShouldHave,
) -> Result<Option<FixedBlobSidecarList<T::EthSpec>>, LookupVerifyError> {
match blob {
Some(blob) => {
let received_id = blob.id();
if !self.requested_ids.contains(&received_id) {
self.state.register_failure_downloading();
Err(LookupVerifyError::UnrequestedBlobId)
} else {
// State should remain downloading until we receive the stream terminator.
self.requested_ids.retain(|id| *id != received_id);
let blob_index = blob.index;
if blob_index >= T::EthSpec::max_blobs_per_block() as u64 {
return Err(LookupVerifyError::InvalidIndex(blob.index));
}
*self.blob_download_queue.index_mut(blob_index as usize) = Some(blob);
Ok(None)
}
}
None => {
self.state.state = State::Processing { peer_id };
let blobs = std::mem::take(&mut self.blob_download_queue);
Ok(Some(blobs))
}
}
}
fn get_parent_root(verified_response: &FixedBlobSidecarList<T::EthSpec>) -> Option<Hash256> {
verified_response
.into_iter()
.filter_map(|blob| blob.as_ref())
.map(|blob| blob.block_parent_root)
.next()
}
fn add_to_child_components(
verified_response: FixedBlobSidecarList<T::EthSpec>,
components: &mut CachedChildComponents<T::EthSpec>,
) {
components.add_cached_child_blobs(verified_response);
}
fn verified_to_reconstructed(
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> FixedBlobSidecarList<T::EthSpec> {
blobs
}
fn send_reconstructed_for_processing(
id: Id,
bl: &BlockLookups<T>,
block_root: Hash256,
verified: FixedBlobSidecarList<T::EthSpec>,
duration: Duration,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
bl.send_blobs_for_processing(
block_root,
verified,
duration,
BlockProcessType::SingleBlob { id },
cx,
)
}
fn response_type() -> ResponseType {
ResponseType::Blob
}
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self {
&mut request.blob_request_state
}
fn get_state(&self) -> &SingleLookupRequestState {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState {
&mut self.state
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,18 +1,17 @@
use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup};
use super::{BlobRequestId, BlockRequestId, DownloadedBlocks, PeerShouldHave, ResponseType}; use super::{DownloadedBlock, PeerShouldHave};
use crate::sync::block_lookups::single_block_lookup::{State, UnknownParentComponents}; use crate::sync::block_lookups::common::Parent;
use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple}; use crate::sync::block_lookups::common::RequestState;
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use itertools::Itertools;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use std::sync::Arc; use std::sync::Arc;
use store::Hash256; use store::Hash256;
use strum::IntoStaticStr; use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, SignedBeaconBlock};
/// How many attempts we try to find a parent of a block before we give up trying. /// How many attempts we try to find a parent of a block before we give up trying.
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
@ -26,9 +25,9 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
/// The root of the block triggering this parent request. /// The root of the block triggering this parent request.
chain_hash: Hash256, chain_hash: Hash256,
/// The blocks that have currently been downloaded. /// The blocks that have currently been downloaded.
downloaded_blocks: Vec<DownloadedBlocks<T::EthSpec>>, downloaded_blocks: Vec<DownloadedBlock<T::EthSpec>>,
/// Request of the last parent. /// Request of the last parent.
pub current_parent_request: SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>, pub current_parent_request: SingleBlockLookup<Parent, T>,
} }
#[derive(Debug, PartialEq, Eq, IntoStaticStr)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
@ -63,9 +62,15 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
parent_root: Hash256, parent_root: Hash256,
peer_id: PeerShouldHave, peer_id: PeerShouldHave,
da_checker: Arc<DataAvailabilityChecker<T>>, da_checker: Arc<DataAvailabilityChecker<T>>,
cx: &mut SyncNetworkContext<T>,
) -> Self { ) -> Self {
let current_parent_request = let current_parent_request = SingleBlockLookup::new(
SingleBlockLookup::new(parent_root, Some(<_>::default()), &[peer_id], da_checker); parent_root,
Some(<_>::default()),
&[peer_id],
da_checker,
cx.next_id(),
);
Self { Self {
chain_hash: block_root, chain_hash: block_root,
@ -85,116 +90,53 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
} }
/// Attempts to request the next unknown parent. If the request fails, it should be removed. /// Attempts to request the next unknown parent. If the request fails, it should be removed.
pub fn request_parent_block( pub fn request_parent(&mut self, cx: &SyncNetworkContext<T>) -> Result<(), RequestError> {
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), RequestError> {
// check to make sure this request hasn't failed // check to make sure this request hasn't failed
if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE { if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong); return Err(RequestError::ChainTooLong);
} }
if let Some((peer_id, request)) = self.current_parent_request.request_block()? { self.current_parent_request
match cx.parent_lookup_block_request(peer_id, request) { .request_block_and_blobs(cx)
Ok(request_id) => { .map_err(Into::into)
self.current_parent_request.id.block_request_id = Some(request_id);
return Ok(());
}
Err(reason) => {
self.current_parent_request.id.block_request_id = None;
return Err(RequestError::SendFailed(reason));
}
}
}
Ok(())
} }
pub fn request_parent_blobs( pub fn check_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), RequestError> {
// check to make sure this request hasn't failed
if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong);
}
if let Some((peer_id, request)) = self.current_parent_request.request_blobs()? {
match cx.parent_lookup_blobs_request(peer_id, request) {
Ok(request_id) => {
self.current_parent_request.id.blob_request_id = Some(request_id);
return Ok(());
}
Err(reason) => {
self.current_parent_request.id.blob_request_id = None;
return Err(RequestError::SendFailed(reason));
}
}
}
Ok(())
}
pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
self.current_parent_request self.current_parent_request
.block_request_state .block_request_state
.state .state
.check_peer_disconnected(peer_id) .check_peer_disconnected(peer_id)
} .and_then(|()| {
pub fn check_blob_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
self.current_parent_request self.current_parent_request
.blob_request_state .blob_request_state
.state .state
.check_peer_disconnected(peer_id) .check_peer_disconnected(peer_id)
})
} }
pub fn add_unknown_parent_block(&mut self, block: RpcBlock<T::EthSpec>) { pub fn add_unknown_parent_block(&mut self, block: RpcBlock<T::EthSpec>) {
let next_parent = block.parent_root(); let next_parent = block.parent_root();
// Cache the block. // Cache the block.
let current_root = self let current_root = self.current_parent_request.block_root();
.current_parent_request
.block_request_state
.requested_block_root;
self.downloaded_blocks.push((current_root, block)); self.downloaded_blocks.push((current_root, block));
// Update the block request. // Update the parent request.
self.current_parent_request
.update_requested_parent_block(next_parent)
}
pub fn block_processing_peer(&self) -> Result<PeerShouldHave, ()> {
self.current_parent_request self.current_parent_request
.block_request_state .block_request_state
.requested_block_root = next_parent; .state
self.current_parent_request.block_request_state.state.state = State::AwaitingDownload; .processing_peer()
self.current_parent_request.id.block_request_id = None;
// Update the blobs request.
self.current_parent_request.blob_request_state.state.state = State::AwaitingDownload;
self.current_parent_request.id.blob_request_id = None;
// Reset the unknown parent components.
self.current_parent_request.unknown_parent_components =
Some(UnknownParentComponents::default());
} }
pub fn add_current_request_block(&mut self, block: Arc<SignedBeaconBlock<T::EthSpec>>) { pub fn blob_processing_peer(&self) -> Result<PeerShouldHave, ()> {
// Cache the block. self.current_parent_request
self.current_parent_request.add_unknown_parent_block(block); .blob_request_state
.state
// Update the request. .processing_peer()
self.current_parent_request.id.block_request_id = None;
}
pub fn add_current_request_blobs(&mut self, blobs: FixedBlobSidecarList<T::EthSpec>) {
// Cache the blobs.
self.current_parent_request.add_unknown_parent_blobs(blobs);
// Update the request.
self.current_parent_request.id.blob_request_id = None;
}
pub fn pending_block_response(&self, req_id: BlockRequestId) -> bool {
self.current_parent_request.id.block_request_id == Some(req_id)
}
pub fn pending_blob_response(&self, req_id: BlobRequestId) -> bool {
self.current_parent_request.id.blob_request_id == Some(req_id)
} }
/// Consumes the parent request and destructures it into it's parts. /// Consumes the parent request and destructures it into it's parts.
@ -205,7 +147,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Hash256, Hash256,
Vec<RpcBlock<T::EthSpec>>, Vec<RpcBlock<T::EthSpec>>,
Vec<Hash256>, Vec<Hash256>,
SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>, SingleBlockLookup<Parent, T>,
) { ) {
let ParentLookup { let ParentLookup {
chain_hash, chain_hash,
@ -227,73 +169,40 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
self.chain_hash self.chain_hash
} }
pub fn block_download_failed(&mut self) { pub fn processing_failed(&mut self) {
self.current_parent_request
.block_request_state
.state
.register_failure_downloading();
self.current_parent_request.id.block_request_id = None;
}
pub fn blob_download_failed(&mut self) {
self.current_parent_request
.blob_request_state
.state
.register_failure_downloading();
self.current_parent_request.id.blob_request_id = None;
}
pub fn block_processing_failed(&mut self) {
self.current_parent_request self.current_parent_request
.block_request_state .block_request_state
.state .state
.register_failure_processing(); .register_failure_processing();
if let Some(components) = self self.current_parent_request
.current_parent_request .blob_request_state
.unknown_parent_components .state
.as_mut() .register_failure_processing();
{ if let Some(components) = self.current_parent_request.cached_child_components.as_mut() {
components.downloaded_block = None; components.downloaded_block = None;
}
self.current_parent_request.id.block_request_id = None;
}
pub fn blob_processing_failed(&mut self) {
self.current_parent_request
.blob_request_state
.state
.register_failure_processing();
if let Some(components) = self
.current_parent_request
.unknown_parent_components
.as_mut()
{
components.downloaded_blobs = <_>::default(); components.downloaded_blobs = <_>::default();
} }
self.current_parent_request.id.blob_request_id = None;
} }
/// Verifies that the received block is what we requested. If so, parent lookup now waits for /// Verifies that the received block is what we requested. If so, parent lookup now waits for
/// the processing result of the block. /// the processing result of the block.
pub fn verify_block( pub fn verify_response<R: RequestState<Parent, T>>(
&mut self, &mut self,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>, block: Option<R::ResponseType>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>, failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<RootBlockTuple<T::EthSpec>>, ParentVerifyError> { ) -> Result<Option<R::VerifiedResponseType>, ParentVerifyError> {
let root_and_block = self.current_parent_request.verify_block(block)?; let expected_block_root = self.current_parent_request.block_root();
let request_state = R::request_state_mut(&mut self.current_parent_request);
let root_and_block = request_state.verify_response(expected_block_root, block)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should // check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored. // be dropped and the peer downscored.
if let Some(parent_root) = root_and_block if let Some(parent_root) = root_and_block
.as_ref() .as_ref()
.map(|(_, block)| block.parent_root()) .and_then(|block| R::get_parent_root(block))
{ {
if failed_chains.contains(&parent_root) { if failed_chains.contains(&parent_root) {
self.current_parent_request request_state.register_failure_downloading();
.block_request_state
.state
.register_failure_downloading();
self.current_parent_request.id.block_request_id = None;
return Err(ParentVerifyError::PreviousFailure { parent_root }); return Err(ParentVerifyError::PreviousFailure { parent_root });
} }
} }
@ -301,49 +210,24 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Ok(root_and_block) Ok(root_and_block)
} }
pub fn verify_blob(
&mut self,
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<RootBlobsTuple<T::EthSpec>>, ParentVerifyError> {
let parent_root_opt = blob.as_ref().map(|b| b.block_parent_root);
let blobs = self.current_parent_request.verify_blob(blob)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored.
if let Some(parent_root) = parent_root_opt {
if failed_chains.contains(&parent_root) {
self.current_parent_request
.blob_request_state
.state
.register_failure_downloading();
self.current_parent_request.id.blob_request_id = None;
return Err(ParentVerifyError::PreviousFailure { parent_root });
}
}
Ok(blobs)
}
pub fn add_peers(&mut self, peer_source: &[PeerShouldHave]) { pub fn add_peers(&mut self, peer_source: &[PeerShouldHave]) {
self.current_parent_request.add_peers(peer_source) self.current_parent_request.add_peers(peer_source)
} }
pub fn used_peers(&self, response_type: ResponseType) -> impl Iterator<Item = &PeerId> + '_ { pub fn used_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
match response_type { self.current_parent_request
ResponseType::Block => self
.current_parent_request
.block_request_state .block_request_state
.state .state
.used_peers .used_peers
.iter(), .iter()
ResponseType::Blob => self .chain(
.current_parent_request self.current_parent_request
.blob_request_state .blob_request_state
.state .state
.used_peers .used_peers
.iter(), .iter(),
} )
.unique()
} }
} }
@ -371,6 +255,7 @@ impl From<LookupRequestError> for RequestError {
RequestError::TooManyAttempts { cannot_process } RequestError::TooManyAttempts { cannot_process }
} }
E::NoPeers => RequestError::NoPeers, E::NoPeers => RequestError::NoPeers,
E::SendFailed(msg) => RequestError::SendFailed(msg),
} }
} }
} }

View File

@ -1,12 +1,13 @@
use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::RequestId; use crate::service::RequestId;
use crate::sync::manager::RequestId as SyncId; use crate::sync::manager::{RequestId as SyncId, SingleLookupReqId};
use crate::NetworkMessage; use crate::NetworkMessage;
use std::sync::Arc; use std::sync::Arc;
use super::*; use super::*;
use crate::sync::block_lookups::common::ResponseType;
use beacon_chain::builder::Witness; use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::test_utils::{build_log, BeaconChainHarness, EphemeralHarnessType}; use beacon_chain::test_utils::{build_log, BeaconChainHarness, EphemeralHarnessType};
@ -20,7 +21,8 @@ use tokio::sync::mpsc;
use types::{ use types::{
map_fork_name, map_fork_name_with, map_fork_name, map_fork_name_with,
test_utils::{SeedableRng, TestRandom, XorShiftRng}, test_utils::{SeedableRng, TestRandom, XorShiftRng},
BeaconBlock, EthSpec, ForkName, FullPayloadDeneb, MinimalEthSpec as E, SignedBeaconBlock, BeaconBlock, BlobSidecar, EthSpec, ForkName, FullPayloadDeneb, MinimalEthSpec as E,
SignedBeaconBlock,
}; };
type T = Witness<ManualSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>; type T = Witness<ManualSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>;
@ -155,7 +157,7 @@ impl TestRig {
} }
#[track_caller] #[track_caller]
fn expect_block_request(&mut self, response_type: ResponseType) -> Id { fn expect_lookup_request(&mut self, response_type: ResponseType) -> SingleLookupReqId {
match response_type { match response_type {
ResponseType::Block => match self.network_rx.try_recv() { ResponseType::Block => match self.network_rx.try_recv() {
Ok(NetworkMessage::SendRequest { Ok(NetworkMessage::SendRequest {
@ -171,7 +173,7 @@ impl TestRig {
Ok(NetworkMessage::SendRequest { Ok(NetworkMessage::SendRequest {
peer_id: _, peer_id: _,
request: Request::BlobsByRoot(_request), request: Request::BlobsByRoot(_request),
request_id: RequestId::Sync(SyncId::SingleBlock { id }), request_id: RequestId::Sync(SyncId::SingleBlob { id }),
}) => id, }) => id,
other => { other => {
panic!("Expected blob request, found {:?}", other); panic!("Expected blob request, found {:?}", other);
@ -181,7 +183,7 @@ impl TestRig {
} }
#[track_caller] #[track_caller]
fn expect_parent_request(&mut self, response_type: ResponseType) -> Id { fn expect_parent_request(&mut self, response_type: ResponseType) -> SingleLookupReqId {
match response_type { match response_type {
ResponseType::Block => match self.network_rx.try_recv() { ResponseType::Block => match self.network_rx.try_recv() {
Ok(NetworkMessage::SendRequest { Ok(NetworkMessage::SendRequest {
@ -195,7 +197,7 @@ impl TestRig {
Ok(NetworkMessage::SendRequest { Ok(NetworkMessage::SendRequest {
peer_id: _, peer_id: _,
request: Request::BlobsByRoot(_request), request: Request::BlobsByRoot(_request),
request_id: RequestId::Sync(SyncId::ParentLookup { id }), request_id: RequestId::Sync(SyncId::ParentLookupBlob { id }),
}) => id, }) => id,
other => panic!("Expected parent blobs request, found {:?}", other), other => panic!("Expected parent blobs request, found {:?}", other),
}, },
@ -295,16 +297,22 @@ fn test_single_block_lookup_happy_path() {
let block_root = block.canonical_root(); let block_root = block.canonical_root();
// Trigger the request // Trigger the request
bl.search_block(block_root, PeerShouldHave::BlockAndBlobs(peer_id), &mut cx); bl.search_block(block_root, PeerShouldHave::BlockAndBlobs(peer_id), &mut cx);
let id = rig.expect_block_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) { if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob); let _ = rig.expect_lookup_request(ResponseType::Blob);
} }
// The peer provides the correct block, should not be penalized. Now the block should be sent // The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing. // for processing.
bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); bl.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
Some(block.into()),
D,
&cx,
);
rig.expect_empty_network(); rig.expect_empty_network();
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
@ -313,11 +321,10 @@ fn test_single_block_lookup_happy_path() {
// Send the stream termination. Peer should have not been penalized, and the request removed // Send the stream termination. Peer should have not been penalized, and the request removed
// after processing. // after processing.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); bl.single_lookup_response::<BlockRequestState<Current>>(id, peer_id, None, D, &cx);
bl.single_block_component_processed( bl.single_block_component_processed::<BlockRequestState<Current>>(
id, id.id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
response_type,
&mut cx, &mut cx,
); );
rig.expect_empty_network(); rig.expect_empty_network();
@ -338,18 +345,18 @@ fn test_single_block_lookup_empty_response() {
// Trigger the request // Trigger the request
bl.search_block(block_hash, PeerShouldHave::BlockAndBlobs(peer_id), &mut cx); bl.search_block(block_hash, PeerShouldHave::BlockAndBlobs(peer_id), &mut cx);
let id = rig.expect_block_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) { if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob); let _ = rig.expect_lookup_request(ResponseType::Blob);
} }
// The peer does not have the block. It should be penalized. // The peer does not have the block. It should be penalized.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); bl.single_lookup_response::<BlockRequestState<Current>>(id, peer_id, None, D, &cx);
rig.expect_penalty(); rig.expect_penalty();
rig.expect_block_request(response_type); // it should be retried rig.expect_lookup_request(response_type); // it should be retried
} }
#[test] #[test]
@ -366,21 +373,27 @@ fn test_single_block_lookup_wrong_response() {
// Trigger the request // Trigger the request
bl.search_block(block_hash, PeerShouldHave::BlockAndBlobs(peer_id), &mut cx); bl.search_block(block_hash, PeerShouldHave::BlockAndBlobs(peer_id), &mut cx);
let id = rig.expect_block_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) { if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob); let _ = rig.expect_lookup_request(ResponseType::Blob);
} }
// Peer sends something else. It should be penalized. // Peer sends something else. It should be penalized.
let bad_block = rig.rand_block(fork_name); let bad_block = rig.rand_block(fork_name);
bl.single_block_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); bl.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
Some(bad_block.into()),
D,
&cx,
);
rig.expect_penalty(); rig.expect_penalty();
rig.expect_block_request(response_type); // should be retried rig.expect_lookup_request(response_type); // should be retried
// Send the stream termination. This should not produce an additional penalty. // Send the stream termination. This should not produce an additional penalty.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); bl.single_lookup_response::<BlockRequestState<Current>>(id, peer_id, None, D, &cx);
rig.expect_empty_network(); rig.expect_empty_network();
} }
@ -398,16 +411,21 @@ fn test_single_block_lookup_failure() {
// Trigger the request // Trigger the request
bl.search_block(block_hash, PeerShouldHave::BlockAndBlobs(peer_id), &mut cx); bl.search_block(block_hash, PeerShouldHave::BlockAndBlobs(peer_id), &mut cx);
let id = rig.expect_block_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) { if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob); let _ = rig.expect_lookup_request(ResponseType::Blob);
} }
// The request fails. RPC failures are handled elsewhere so we should not penalize the peer. // The request fails. RPC failures are handled elsewhere so we should not penalize the peer.
bl.single_block_lookup_failed(id, &peer_id, &mut cx, RPCError::UnsupportedProtocol); bl.single_block_lookup_failed::<BlockRequestState<Current>>(
rig.expect_block_request(response_type); id,
&peer_id,
&cx,
RPCError::UnsupportedProtocol,
);
rig.expect_lookup_request(response_type);
rig.expect_empty_network(); rig.expect_empty_network();
} }
@ -429,16 +447,22 @@ fn test_single_block_lookup_becomes_parent_request() {
PeerShouldHave::BlockAndBlobs(peer_id), PeerShouldHave::BlockAndBlobs(peer_id),
&mut cx, &mut cx,
); );
let id = rig.expect_block_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) { if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob); let _ = rig.expect_lookup_request(ResponseType::Blob);
} }
// The peer provides the correct block, should not be penalized. Now the block should be sent // The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing. // for processing.
bl.single_block_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); bl.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
Some(block.clone()),
D,
&cx,
);
rig.expect_empty_network(); rig.expect_empty_network();
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
@ -447,10 +471,9 @@ fn test_single_block_lookup_becomes_parent_request() {
// Send the stream termination. Peer should have not been penalized, and the request moved to a // Send the stream termination. Peer should have not been penalized, and the request moved to a
// parent request after processing. // parent request after processing.
bl.single_block_component_processed( bl.single_block_component_processed::<BlockRequestState<Current>>(
id, id.id,
BlockError::ParentUnknown(block.into()).into(), BlockError::ParentUnknown(block.into()).into(),
response_type,
&mut cx, &mut cx,
); );
assert_eq!(bl.single_block_lookups.len(), 1); assert_eq!(bl.single_block_lookups.len(), 1);
@ -491,22 +514,23 @@ fn test_parent_lookup_happy_path() {
} }
// Peer sends the right block, it should be sent for processing. Peer should not be penalized. // Peer sends the right block, it should be sent for processing. Peer should not be penalized.
bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
Some(parent.into()),
D,
&cx,
);
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
rig.expect_empty_network(); rig.expect_empty_network();
// Processing succeeds, now the rest of the chain should be sent for processing. // Processing succeeds, now the rest of the chain should be sent for processing.
bl.parent_block_processed( bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx);
chain_hash,
BlockError::BlockIsAlreadyKnown.into(),
response_type,
&mut cx,
);
rig.expect_parent_chain_process(); rig.expect_parent_chain_process();
let process_result = BatchProcessResult::Success { let process_result = BatchProcessResult::Success {
was_non_empty: true, was_non_empty: true,
}; };
bl.parent_chain_processed(chain_hash, process_result, &mut cx); bl.parent_chain_processed(chain_hash, process_result, &cx);
assert_eq!(bl.parent_lookups.len(), 0); assert_eq!(bl.parent_lookups.len(), 0);
} }
@ -538,30 +562,41 @@ fn test_parent_lookup_wrong_response() {
// Peer sends the wrong block, peer should be penalized and the block re-requested. // Peer sends the wrong block, peer should be penalized and the block re-requested.
let bad_block = rig.rand_block(fork_name); let bad_block = rig.rand_block(fork_name);
bl.parent_lookup_response(id1, peer_id, Some(bad_block.into()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id1,
peer_id,
Some(bad_block.into()),
D,
&cx,
);
rig.expect_penalty(); rig.expect_penalty();
let id2 = rig.expect_parent_request(response_type); let id2 = rig.expect_parent_request(response_type);
// Send the stream termination for the first request. This should not produce extra penalties. // Send the stream termination for the first request. This should not produce extra penalties.
bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(id1, peer_id, None, D, &cx);
rig.expect_empty_network(); rig.expect_empty_network();
// Send the right block this time. // Send the right block this time.
bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id2,
peer_id,
Some(parent.into()),
D,
&cx,
);
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
// Processing succeeds, now the rest of the chain should be sent for processing. // Processing succeeds, now the rest of the chain should be sent for processing.
bl.parent_block_processed( bl.parent_block_processed(
chain_hash, chain_hash,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
response_type,
&mut cx, &mut cx,
); );
rig.expect_parent_chain_process(); rig.expect_parent_chain_process();
let process_result = BatchProcessResult::Success { let process_result = BatchProcessResult::Success {
was_non_empty: true, was_non_empty: true,
}; };
bl.parent_chain_processed(chain_hash, process_result, &mut cx); bl.parent_chain_processed(chain_hash, process_result, &cx);
assert_eq!(bl.parent_lookups.len(), 0); assert_eq!(bl.parent_lookups.len(), 0);
} }
@ -592,26 +627,31 @@ fn test_parent_lookup_empty_response() {
} }
// Peer sends an empty response, peer should be penalized and the block re-requested. // Peer sends an empty response, peer should be penalized and the block re-requested.
bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(id1, peer_id, None, D, &cx);
rig.expect_penalty(); rig.expect_penalty();
let id2 = rig.expect_parent_request(response_type); let id2 = rig.expect_parent_request(response_type);
// Send the right block this time. // Send the right block this time.
bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id2,
peer_id,
Some(parent.into()),
D,
&cx,
);
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
// Processing succeeds, now the rest of the chain should be sent for processing. // Processing succeeds, now the rest of the chain should be sent for processing.
bl.parent_block_processed( bl.parent_block_processed(
chain_hash, chain_hash,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
response_type,
&mut cx, &mut cx,
); );
rig.expect_parent_chain_process(); rig.expect_parent_chain_process();
let process_result = BatchProcessResult::Success { let process_result = BatchProcessResult::Success {
was_non_empty: true, was_non_empty: true,
}; };
bl.parent_chain_processed(chain_hash, process_result, &mut cx); bl.parent_chain_processed(chain_hash, process_result, &cx);
assert_eq!(bl.parent_lookups.len(), 0); assert_eq!(bl.parent_lookups.len(), 0);
} }
@ -642,10 +682,10 @@ fn test_parent_lookup_rpc_failure() {
} }
// The request fails. It should be tried again. // The request fails. It should be tried again.
bl.parent_lookup_failed( bl.parent_lookup_failed::<BlockRequestState<Parent>>(
id1, id1,
peer_id, peer_id,
&mut cx, &cx,
RPCError::ErrorResponse( RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable, RPCResponseErrorCode::ResourceUnavailable,
"older than deneb".into(), "older than deneb".into(),
@ -654,21 +694,26 @@ fn test_parent_lookup_rpc_failure() {
let id2 = rig.expect_parent_request(response_type); let id2 = rig.expect_parent_request(response_type);
// Send the right block this time. // Send the right block this time.
bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id2,
peer_id,
Some(parent.into()),
D,
&cx,
);
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
// Processing succeeds, now the rest of the chain should be sent for processing. // Processing succeeds, now the rest of the chain should be sent for processing.
bl.parent_block_processed( bl.parent_block_processed(
chain_hash, chain_hash,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
response_type,
&mut cx, &mut cx,
); );
rig.expect_parent_chain_process(); rig.expect_parent_chain_process();
let process_result = BatchProcessResult::Success { let process_result = BatchProcessResult::Success {
was_non_empty: true, was_non_empty: true,
}; };
bl.parent_chain_processed(chain_hash, process_result, &mut cx); bl.parent_chain_processed(chain_hash, process_result, &cx);
assert_eq!(bl.parent_lookups.len(), 0); assert_eq!(bl.parent_lookups.len(), 0);
} }
@ -701,10 +746,10 @@ fn test_parent_lookup_too_many_attempts() {
// make sure every error is accounted for // make sure every error is accounted for
0 => { 0 => {
// The request fails. It should be tried again. // The request fails. It should be tried again.
bl.parent_lookup_failed( bl.parent_lookup_failed::<BlockRequestState<Parent>>(
id, id,
peer_id, peer_id,
&mut cx, &cx,
RPCError::ErrorResponse( RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable, RPCResponseErrorCode::ResourceUnavailable,
"older than deneb".into(), "older than deneb".into(),
@ -714,9 +759,23 @@ fn test_parent_lookup_too_many_attempts() {
_ => { _ => {
// Send a bad block this time. It should be tried again. // Send a bad block this time. It should be tried again.
let bad_block = rig.rand_block(fork_name); let bad_block = rig.rand_block(fork_name);
bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
Some(bad_block.into()),
D,
&cx,
);
// Send the stream termination // Send the stream termination
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
// Note, previously we would send the same lookup id with a stream terminator,
// we'd ignore it because we'd intrepret it as an unrequested response, since
// we already got one response for the block. I'm not sure what the intent is
// for having this stream terminator line in this test at all. Receiving an invalid
// block and a stream terminator with the same Id now results in two failed attempts,
// I'm unsure if this is how it should behave?
//
bl.parent_lookup_response::<BlockRequestState<Parent>>(id, peer_id, None, D, &cx);
rig.expect_penalty(); rig.expect_penalty();
} }
} }
@ -764,10 +823,10 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
} }
if i % 2 != 0 { if i % 2 != 0 {
// The request fails. It should be tried again. // The request fails. It should be tried again.
bl.parent_lookup_failed( bl.parent_lookup_failed::<BlockRequestState<Parent>>(
id, id,
peer_id, peer_id,
&mut cx, &cx,
RPCError::ErrorResponse( RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable, RPCResponseErrorCode::ResourceUnavailable,
"older than deneb".into(), "older than deneb".into(),
@ -776,7 +835,13 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
} else { } else {
// Send a bad block this time. It should be tried again. // Send a bad block this time. It should be tried again.
let bad_block = rig.rand_block(fork_name); let bad_block = rig.rand_block(fork_name);
bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
Some(bad_block.into()),
D,
&cx,
);
rig.expect_penalty(); rig.expect_penalty();
} }
if i < parent_lookup::PARENT_FAIL_TOLERANCE { if i < parent_lookup::PARENT_FAIL_TOLERANCE {
@ -825,10 +890,10 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
let _ = rig.expect_parent_request(ResponseType::Blob); let _ = rig.expect_parent_request(ResponseType::Blob);
} }
// The request fails. It should be tried again. // The request fails. It should be tried again.
bl.parent_lookup_failed( bl.parent_lookup_failed::<BlockRequestState<Parent>>(
id, id,
peer_id, peer_id,
&mut cx, &cx,
RPCError::ErrorResponse( RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable, RPCResponseErrorCode::ResourceUnavailable,
"older than deneb".into(), "older than deneb".into(),
@ -846,14 +911,15 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
assert!(!bl.failed_chains.contains(&block_root)); assert!(!bl.failed_chains.contains(&block_root));
// send the right parent but fail processing // send the right parent but fail processing
bl.parent_lookup_response(id, peer_id, Some(parent.clone()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
bl.parent_block_processed( id,
block_root, peer_id,
BlockError::InvalidSignature.into(), Some(parent.clone()),
response_type, D,
&mut cx, &cx,
); );
bl.parent_lookup_response(id, peer_id, None, D, &mut cx); bl.parent_block_processed(block_root, BlockError::InvalidSignature.into(), &mut cx);
bl.parent_lookup_response::<BlockRequestState<Parent>>(id, peer_id, None, D, &cx);
rig.expect_penalty(); rig.expect_penalty();
} }
@ -902,16 +968,21 @@ fn test_parent_lookup_too_deep() {
let _ = rig.expect_parent_request(ResponseType::Blob); let _ = rig.expect_parent_request(ResponseType::Blob);
} }
// the block // the block
bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
Some(block.clone()),
D,
&cx,
);
// the stream termination // the stream termination
bl.parent_lookup_response(id, peer_id, None, D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(id, peer_id, None, D, &cx);
// the processing request // the processing request
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
// the processing result // the processing result
bl.parent_block_processed( bl.parent_block_processed(
chain_hash, chain_hash,
BlockError::ParentUnknown(block.into()).into(), BlockError::ParentUnknown(block.into()).into(),
response_type,
&mut cx, &mut cx,
) )
} }
@ -962,16 +1033,22 @@ fn test_single_block_lookup_ignored_response() {
PeerShouldHave::BlockAndBlobs(peer_id), PeerShouldHave::BlockAndBlobs(peer_id),
&mut cx, &mut cx,
); );
let id = rig.expect_block_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) { if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob); let _ = rig.expect_lookup_request(ResponseType::Blob);
} }
// The peer provides the correct block, should not be penalized. Now the block should be sent // The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing. // for processing.
bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); bl.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
Some(block.into()),
D,
&cx,
);
rig.expect_empty_network(); rig.expect_empty_network();
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
@ -980,9 +1057,13 @@ fn test_single_block_lookup_ignored_response() {
// Send the stream termination. Peer should have not been penalized, and the request removed // Send the stream termination. Peer should have not been penalized, and the request removed
// after processing. // after processing.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); bl.single_lookup_response::<BlockRequestState<Current>>(id, peer_id, None, D, &cx);
// Send an Ignored response, the request should be dropped // Send an Ignored response, the request should be dropped
bl.single_block_component_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx); bl.single_block_component_processed::<BlockRequestState<Current>>(
id.id,
BlockProcessingResult::Ignored,
&mut cx,
);
rig.expect_empty_network(); rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 0); assert_eq!(bl.single_block_lookups.len(), 0);
} }
@ -1015,17 +1096,18 @@ fn test_parent_lookup_ignored_response() {
} }
// Peer sends the right block, it should be sent for processing. Peer should not be penalized. // Peer sends the right block, it should be sent for processing. Peer should not be penalized.
bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
Some(parent.into()),
D,
&cx,
);
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
rig.expect_empty_network(); rig.expect_empty_network();
// Return an Ignored result. The request should be dropped // Return an Ignored result. The request should be dropped
bl.parent_block_processed( bl.parent_block_processed(chain_hash, BlockProcessingResult::Ignored, &mut cx);
chain_hash,
BlockProcessingResult::Ignored,
response_type,
&mut cx,
);
rig.expect_empty_network(); rig.expect_empty_network();
assert_eq!(bl.parent_lookups.len(), 0); assert_eq!(bl.parent_lookups.len(), 0);
} }
@ -1092,25 +1174,25 @@ fn test_same_chain_race_condition() {
let _ = rig.expect_parent_request(ResponseType::Blob); let _ = rig.expect_parent_request(ResponseType::Blob);
} }
// the block // the block
bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
Some(block.clone()),
D,
&cx,
);
// the stream termination // the stream termination
bl.parent_lookup_response(id, peer_id, None, D, &mut cx); bl.parent_lookup_response::<BlockRequestState<Parent>>(id, peer_id, None, D, &cx);
// the processing request // the processing request
rig.expect_block_process(response_type); rig.expect_block_process(response_type);
// the processing result // the processing result
if i + 2 == depth { if i + 2 == depth {
// one block was removed // one block was removed
bl.parent_block_processed( bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx)
chain_hash,
BlockError::BlockIsAlreadyKnown.into(),
response_type,
&mut cx,
)
} else { } else {
bl.parent_block_processed( bl.parent_block_processed(
chain_hash, chain_hash,
BlockError::ParentUnknown(block.into()).into(), BlockError::ParentUnknown(block.into()).into(),
response_type,
&mut cx, &mut cx,
) )
} }
@ -1137,12 +1219,13 @@ fn test_same_chain_race_condition() {
let process_result = BatchProcessResult::Success { let process_result = BatchProcessResult::Success {
was_non_empty: true, was_non_empty: true,
}; };
bl.parent_chain_processed(chain_hash, process_result, &mut cx); bl.parent_chain_processed(chain_hash, process_result, &cx);
assert_eq!(bl.parent_lookups.len(), 0); assert_eq!(bl.parent_lookups.len(), 0);
} }
mod deneb_only { mod deneb_only {
use super::*; use super::*;
use crate::sync::block_lookups::common::ResponseType;
use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::AvailabilityCheckError;
use std::ops::IndexMut; use std::ops::IndexMut;
use std::str::FromStr; use std::str::FromStr;
@ -1156,10 +1239,10 @@ mod deneb_only {
parent_block: Option<Arc<SignedBeaconBlock<E>>>, parent_block: Option<Arc<SignedBeaconBlock<E>>>,
parent_blobs: Vec<Arc<BlobSidecar<E>>>, parent_blobs: Vec<Arc<BlobSidecar<E>>>,
peer_id: PeerId, peer_id: PeerId,
block_req_id: Option<u32>, block_req_id: Option<SingleLookupReqId>,
parent_block_req_id: Option<u32>, parent_block_req_id: Option<SingleLookupReqId>,
blob_req_id: Option<u32>, blob_req_id: Option<SingleLookupReqId>,
parent_blob_req_id: Option<u32>, parent_blob_req_id: Option<SingleLookupReqId>,
slot: Slot, slot: Slot,
block_root: Hash256, block_root: Hash256,
} }
@ -1202,8 +1285,8 @@ mod deneb_only {
PeerShouldHave::BlockAndBlobs(peer_id), PeerShouldHave::BlockAndBlobs(peer_id),
&mut cx, &mut cx,
); );
let block_req_id = rig.expect_block_request(ResponseType::Block); let block_req_id = rig.expect_lookup_request(ResponseType::Block);
let blob_req_id = rig.expect_block_request(ResponseType::Blob); let blob_req_id = rig.expect_lookup_request(ResponseType::Blob);
(Some(block_req_id), Some(blob_req_id), None, None) (Some(block_req_id), Some(blob_req_id), None, None)
} }
RequestTrigger::GossipUnknownParentBlock => { RequestTrigger::GossipUnknownParentBlock => {
@ -1223,12 +1306,12 @@ mod deneb_only {
block_root = child_root; block_root = child_root;
bl.search_child_block( bl.search_child_block(
child_root, child_root,
Some(UnknownParentComponents::new(Some(child_block), None)), Some(CachedChildComponents::new(Some(child_block), None)),
&[PeerShouldHave::Neither(peer_id)], &[PeerShouldHave::Neither(peer_id)],
&mut cx, &mut cx,
); );
let blob_req_id = rig.expect_block_request(ResponseType::Blob); let blob_req_id = rig.expect_lookup_request(ResponseType::Blob);
rig.expect_empty_network(); // expect no block request rig.expect_empty_network(); // expect no block request
bl.search_parent(slot, child_root, parent_root, peer_id, &mut cx); bl.search_parent(slot, child_root, parent_root, peer_id, &mut cx);
let parent_block_req_id = rig.expect_parent_request(ResponseType::Block); let parent_block_req_id = rig.expect_parent_request(ResponseType::Block);
@ -1261,13 +1344,13 @@ mod deneb_only {
*blobs.index_mut(0) = Some(child_blob); *blobs.index_mut(0) = Some(child_blob);
bl.search_child_block( bl.search_child_block(
child_root, child_root,
Some(UnknownParentComponents::new(None, Some(blobs))), Some(CachedChildComponents::new(None, Some(blobs))),
&[PeerShouldHave::Neither(peer_id)], &[PeerShouldHave::Neither(peer_id)],
&mut cx, &mut cx,
); );
let block_req_id = rig.expect_block_request(ResponseType::Block); let block_req_id = rig.expect_lookup_request(ResponseType::Block);
let blobs_req_id = rig.expect_block_request(ResponseType::Blob); let blobs_req_id = rig.expect_lookup_request(ResponseType::Blob);
rig.expect_empty_network(); // expect no block request rig.expect_empty_network(); // expect no block request
bl.search_parent(slot, child_root, parent_root, peer_id, &mut cx); bl.search_parent(slot, child_root, parent_root, peer_id, &mut cx);
let parent_block_req_id = rig.expect_parent_request(ResponseType::Block); let parent_block_req_id = rig.expect_parent_request(ResponseType::Block);
@ -1281,8 +1364,8 @@ mod deneb_only {
} }
RequestTrigger::GossipUnknownBlockOrBlob => { RequestTrigger::GossipUnknownBlockOrBlob => {
bl.search_block(block_root, PeerShouldHave::Neither(peer_id), &mut cx); bl.search_block(block_root, PeerShouldHave::Neither(peer_id), &mut cx);
let block_req_id = rig.expect_block_request(ResponseType::Block); let block_req_id = rig.expect_lookup_request(ResponseType::Block);
let blob_req_id = rig.expect_block_request(ResponseType::Blob); let blob_req_id = rig.expect_lookup_request(ResponseType::Blob);
(Some(block_req_id), Some(blob_req_id), None, None) (Some(block_req_id), Some(blob_req_id), None, None)
} }
}; };
@ -1307,12 +1390,12 @@ mod deneb_only {
fn parent_block_response(mut self) -> Self { fn parent_block_response(mut self) -> Self {
self.rig.expect_empty_network(); self.rig.expect_empty_network();
self.bl.parent_lookup_response( self.bl.parent_lookup_response::<BlockRequestState<Parent>>(
self.parent_block_req_id.expect("parent request id"), self.parent_block_req_id.expect("parent request id"),
self.peer_id, self.peer_id,
self.parent_block.clone(), self.parent_block.clone(),
D, D,
&mut self.cx, &self.cx,
); );
assert_eq!(self.bl.parent_lookups.len(), 1); assert_eq!(self.bl.parent_lookups.len(), 1);
@ -1321,21 +1404,23 @@ mod deneb_only {
fn parent_blob_response(mut self) -> Self { fn parent_blob_response(mut self) -> Self {
for blob in &self.parent_blobs { for blob in &self.parent_blobs {
self.bl.parent_lookup_blob_response( self.bl
.parent_lookup_response::<BlobRequestState<Parent, E>>(
self.parent_blob_req_id.expect("parent blob request id"), self.parent_blob_req_id.expect("parent blob request id"),
self.peer_id, self.peer_id,
Some(blob.clone()), Some(blob.clone()),
D, D,
&mut self.cx, &self.cx,
); );
assert_eq!(self.bl.parent_lookups.len(), 1); assert_eq!(self.bl.parent_lookups.len(), 1);
} }
self.bl.parent_lookup_blob_response( self.bl
.parent_lookup_response::<BlobRequestState<Parent, E>>(
self.parent_blob_req_id.expect("blob request id"), self.parent_blob_req_id.expect("blob request id"),
self.peer_id, self.peer_id,
None, None,
D, D,
&mut self.cx, &self.cx,
); );
self self
@ -1353,12 +1438,13 @@ mod deneb_only {
fn block_response(mut self) -> Self { fn block_response(mut self) -> Self {
// The peer provides the correct block, should not be penalized. Now the block should be sent // The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing. // for processing.
self.bl.single_block_lookup_response( self.bl
.single_lookup_response::<BlockRequestState<Current>>(
self.block_req_id.expect("block request id"), self.block_req_id.expect("block request id"),
self.peer_id, self.peer_id,
self.block.clone(), self.block.clone(),
D, D,
&mut self.cx, &self.cx,
); );
self.rig.expect_empty_network(); self.rig.expect_empty_network();
@ -1369,21 +1455,23 @@ mod deneb_only {
fn blobs_response(mut self) -> Self { fn blobs_response(mut self) -> Self {
for blob in &self.blobs { for blob in &self.blobs {
self.bl.single_blob_lookup_response( self.bl
.single_lookup_response::<BlobRequestState<Current, E>>(
self.blob_req_id.expect("blob request id"), self.blob_req_id.expect("blob request id"),
self.peer_id, self.peer_id,
Some(blob.clone()), Some(blob.clone()),
D, D,
&mut self.cx, &self.cx,
); );
assert_eq!(self.bl.single_block_lookups.len(), 1); assert_eq!(self.bl.single_block_lookups.len(), 1);
} }
self.bl.single_blob_lookup_response( self.bl
.single_lookup_response::<BlobRequestState<Current, E>>(
self.blob_req_id.expect("blob request id"), self.blob_req_id.expect("blob request id"),
self.peer_id, self.peer_id,
None, None,
D, D,
&mut self.cx, &self.cx,
); );
self self
} }
@ -1402,45 +1490,48 @@ mod deneb_only {
} }
fn empty_block_response(mut self) -> Self { fn empty_block_response(mut self) -> Self {
self.bl.single_block_lookup_response( self.bl
.single_lookup_response::<BlockRequestState<Current>>(
self.block_req_id.expect("block request id"), self.block_req_id.expect("block request id"),
self.peer_id, self.peer_id,
None, None,
D, D,
&mut self.cx, &self.cx,
); );
self self
} }
fn empty_blobs_response(mut self) -> Self { fn empty_blobs_response(mut self) -> Self {
self.bl.single_blob_lookup_response( self.bl
.single_lookup_response::<BlobRequestState<Current, E>>(
self.blob_req_id.expect("blob request id"), self.blob_req_id.expect("blob request id"),
self.peer_id, self.peer_id,
None, None,
D, D,
&mut self.cx, &self.cx,
); );
self self
} }
fn empty_parent_block_response(mut self) -> Self { fn empty_parent_block_response(mut self) -> Self {
self.bl.parent_lookup_response( self.bl.parent_lookup_response::<BlockRequestState<Parent>>(
self.parent_block_req_id.expect("block request id"), self.parent_block_req_id.expect("block request id"),
self.peer_id, self.peer_id,
None, None,
D, D,
&mut self.cx, &self.cx,
); );
self self
} }
fn empty_parent_blobs_response(mut self) -> Self { fn empty_parent_blobs_response(mut self) -> Self {
self.bl.parent_lookup_blob_response( self.bl
.parent_lookup_response::<BlobRequestState<Parent, E>>(
self.parent_blob_req_id.expect("blob request id"), self.parent_blob_req_id.expect("blob request id"),
self.peer_id, self.peer_id,
None, None,
D, D,
&mut self.cx, &self.cx,
); );
self self
} }
@ -1448,10 +1539,12 @@ mod deneb_only {
fn block_imported(mut self) -> Self { fn block_imported(mut self) -> Self {
// Missing blobs should be the request is not removed, the outstanding blobs request should // Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request. // mean we do not send a new request.
self.bl.single_block_component_processed( self.bl
self.block_req_id.expect("block request id"), .single_block_component_processed::<BlockRequestState<Current>>(
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), self.block_req_id.expect("block request id").id,
ResponseType::Block, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(
self.block_root,
)),
&mut self.cx, &mut self.cx,
); );
self.rig.expect_empty_network(); self.rig.expect_empty_network();
@ -1463,7 +1556,6 @@ mod deneb_only {
self.bl.parent_block_processed( self.bl.parent_block_processed(
self.block_root, self.block_root,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)),
ResponseType::Block,
&mut self.cx, &mut self.cx,
); );
self.rig.expect_empty_network(); self.rig.expect_empty_network();
@ -1477,7 +1569,6 @@ mod deneb_only {
BlockProcessingResult::Err(BlockError::ParentUnknown(RpcBlock::new_without_blobs( BlockProcessingResult::Err(BlockError::ParentUnknown(RpcBlock::new_without_blobs(
self.parent_block.clone().expect("parent block"), self.parent_block.clone().expect("parent block"),
))), ))),
ResponseType::Block,
&mut self.cx, &mut self.cx,
); );
assert_eq!(self.bl.parent_lookups.len(), 1); assert_eq!(self.bl.parent_lookups.len(), 1);
@ -1488,7 +1579,6 @@ mod deneb_only {
self.bl.parent_block_processed( self.bl.parent_block_processed(
self.block_root, self.block_root,
BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid),
ResponseType::Block,
&mut self.cx, &mut self.cx,
); );
assert_eq!(self.bl.parent_lookups.len(), 1); assert_eq!(self.bl.parent_lookups.len(), 1);
@ -1496,10 +1586,10 @@ mod deneb_only {
} }
fn invalid_block_processed(mut self) -> Self { fn invalid_block_processed(mut self) -> Self {
self.bl.single_block_component_processed( self.bl
self.block_req_id.expect("block request id"), .single_block_component_processed::<BlockRequestState<Current>>(
self.block_req_id.expect("block request id").id,
BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid),
ResponseType::Block,
&mut self.cx, &mut self.cx,
); );
assert_eq!(self.bl.single_block_lookups.len(), 1); assert_eq!(self.bl.single_block_lookups.len(), 1);
@ -1507,12 +1597,12 @@ mod deneb_only {
} }
fn invalid_blob_processed(mut self) -> Self { fn invalid_blob_processed(mut self) -> Self {
self.bl.single_block_component_processed( self.bl
self.blob_req_id.expect("blob request id"), .single_block_component_processed::<BlobRequestState<Current, E>>(
self.blob_req_id.expect("blob request id").id,
BlockProcessingResult::Err(BlockError::AvailabilityCheck( BlockProcessingResult::Err(BlockError::AvailabilityCheck(
AvailabilityCheckError::KzgVerificationFailed, AvailabilityCheckError::KzgVerificationFailed,
)), )),
ResponseType::Blob,
&mut self.cx, &mut self.cx,
); );
assert_eq!(self.bl.single_block_lookups.len(), 1); assert_eq!(self.bl.single_block_lookups.len(), 1);
@ -1520,13 +1610,13 @@ mod deneb_only {
} }
fn missing_components_from_block_request(mut self) -> Self { fn missing_components_from_block_request(mut self) -> Self {
self.bl.single_block_component_processed( self.bl
self.block_req_id.expect("block request id"), .single_block_component_processed::<BlockRequestState<Current>>(
self.block_req_id.expect("block request id").id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
self.slot, self.slot,
self.block_root, self.block_root,
)), )),
ResponseType::Block,
&mut self.cx, &mut self.cx,
); );
assert_eq!(self.bl.single_block_lookups.len(), 1); assert_eq!(self.bl.single_block_lookups.len(), 1);
@ -1534,13 +1624,13 @@ mod deneb_only {
} }
fn missing_components_from_blob_request(mut self) -> Self { fn missing_components_from_blob_request(mut self) -> Self {
self.bl.single_block_component_processed( self.bl
self.blob_req_id.expect("blob request id"), .single_block_component_processed::<BlobRequestState<Current, E>>(
self.blob_req_id.expect("blob request id").id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
self.slot, self.slot,
self.block_root, self.block_root,
)), )),
ResponseType::Blob,
&mut self.cx, &mut self.cx,
); );
assert_eq!(self.bl.single_block_lookups.len(), 1); assert_eq!(self.bl.single_block_lookups.len(), 1);
@ -1556,12 +1646,12 @@ mod deneb_only {
self self
} }
fn expect_block_request(mut self) -> Self { fn expect_block_request(mut self) -> Self {
let id = self.rig.expect_block_request(ResponseType::Block); let id = self.rig.expect_lookup_request(ResponseType::Block);
self.block_req_id = Some(id); self.block_req_id = Some(id);
self self
} }
fn expect_blobs_request(mut self) -> Self { fn expect_blobs_request(mut self) -> Self {
let id = self.rig.expect_block_request(ResponseType::Blob); let id = self.rig.expect_lookup_request(ResponseType::Blob);
self.blob_req_id = Some(id); self.blob_req_id = Some(id);
self self
} }

View File

@ -41,10 +41,10 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
use crate::service::NetworkMessage; use crate::service::NetworkMessage;
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::block_lookups::common::{Current, Parent};
use crate::sync::block_lookups::delayed_lookup; use crate::sync::block_lookups::delayed_lookup;
use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage; use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage;
pub use crate::sync::block_lookups::ResponseType; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, CachedChildComponents};
use crate::sync::block_lookups::UnknownParentComponents;
use crate::sync::range_sync::ByRangeRequestType; use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
@ -83,13 +83,25 @@ pub const DELAY_QUEUE_CHANNEL_SIZE: usize = 128;
pub type Id = u32; pub type Id = u32;
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct SingleLookupReqId {
pub id: Id,
pub req_counter: Id,
}
/// Id of rpc requests sent by sync to the network. /// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RequestId { pub enum RequestId {
/// Request searching for a block given a hash. /// Request searching for a block given a hash.
SingleBlock { id: Id }, SingleBlock { id: SingleLookupReqId },
/// Request searching for a block's parent. The id is the chain /// Request searching for a set of blobs given a hash.
ParentLookup { id: Id }, SingleBlob { id: SingleLookupReqId },
/// Request searching for a block's parent. The id is the chain, share with the corresponding
/// blob id.
ParentLookup { id: SingleLookupReqId },
/// Request searching for a block's parent blobs. The id is the chain, shared with the corresponding
/// block id.
ParentLookupBlob { id: SingleLookupReqId },
/// Request was from the backfill sync algorithm. /// Request was from the backfill sync algorithm.
BackFillBlocks { id: Id }, BackFillBlocks { id: Id },
/// Backfill request that is composed by both a block range request and a blob range request. /// Backfill request that is composed by both a block range request and a blob range request.
@ -100,10 +112,6 @@ pub enum RequestId {
RangeBlockAndBlobs { id: Id }, RangeBlockAndBlobs { id: Id },
} }
// TODO(diva) I'm updating functions what at a time, but this should be revisited because I think
// some code paths that are split for blobs and blocks can be made just one after sync as a whole
// is updated.
#[derive(Debug)] #[derive(Debug)]
/// A message that can be sent to the sync manager thread. /// A message that can be sent to the sync manager thread.
pub enum SyncMessage<T: EthSpec> { pub enum SyncMessage<T: EthSpec> {
@ -166,7 +174,6 @@ pub enum SyncMessage<T: EthSpec> {
BlockComponentProcessed { BlockComponentProcessed {
process_type: BlockProcessType, process_type: BlockProcessType,
result: BlockProcessingResult<T>, result: BlockProcessingResult<T>,
response_type: ResponseType,
}, },
} }
@ -174,6 +181,7 @@ pub enum SyncMessage<T: EthSpec> {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum BlockProcessType { pub enum BlockProcessType {
SingleBlock { id: Id }, SingleBlock { id: Id },
SingleBlob { id: Id },
ParentLookup { chain_hash: Hash256 }, ParentLookup { chain_hash: Hash256 },
} }
@ -324,16 +332,40 @@ impl<T: BeaconChainTypes> SyncManager<T> {
trace!(self.log, "Sync manager received a failed RPC"); trace!(self.log, "Sync manager received a failed RPC");
match request_id { match request_id {
RequestId::SingleBlock { id } => { RequestId::SingleBlock { id } => {
self.block_lookups.single_block_lookup_failed( self.block_lookups
.single_block_lookup_failed::<BlockRequestState<Current>>(
id, id,
&peer_id, &peer_id,
&mut self.network, &self.network,
error,
);
}
RequestId::SingleBlob { id } => {
self.block_lookups
.single_block_lookup_failed::<BlobRequestState<Current, T::EthSpec>>(
id,
&peer_id,
&self.network,
error, error,
); );
} }
RequestId::ParentLookup { id } => { RequestId::ParentLookup { id } => {
self.block_lookups self.block_lookups
.parent_lookup_failed(id, peer_id, &mut self.network, error); .parent_lookup_failed::<BlockRequestState<Parent>>(
id,
peer_id,
&self.network,
error,
);
}
RequestId::ParentLookupBlob { id } => {
self.block_lookups
.parent_lookup_failed::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
&self.network,
error,
);
} }
RequestId::BackFillBlocks { id } => { RequestId::BackFillBlocks { id } => {
if let Some(batch_id) = self if let Some(batch_id) = self
@ -628,6 +660,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
let block_root = blob.block_root; let block_root = blob.block_root;
let parent_root = blob.block_parent_root; let parent_root = blob.block_parent_root;
let blob_index = blob.index; let blob_index = blob.index;
if blob_index >= T::EthSpec::max_blobs_per_block() as u64 {
warn!(self.log, "Peer sent blob with invalid index"; "index" => blob_index, "peer_id" => %peer_id);
return;
}
let mut blobs = FixedBlobSidecarList::default(); let mut blobs = FixedBlobSidecarList::default();
*blobs.index_mut(blob_index as usize) = Some(blob); *blobs.index_mut(blob_index as usize) = Some(blob);
self.handle_unknown_parent( self.handle_unknown_parent(
@ -635,7 +671,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
block_root, block_root,
parent_root, parent_root,
blob_slot, blob_slot,
Some(UnknownParentComponents::new(None, Some(blobs))), Some(CachedChildComponents::new(None, Some(blobs))),
); );
} }
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
@ -652,8 +688,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// If we are not synced, ignore this block. // If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) { if self.synced_and_connected(&peer_id) {
if self.should_delay_lookup(slot) { if self.should_delay_lookup(slot) {
self.block_lookups self.block_lookups.search_block_delayed(
.search_block_delayed(block_root, PeerShouldHave::Neither(peer_id)); block_root,
PeerShouldHave::Neither(peer_id),
&mut self.network,
);
if let Err(e) = self if let Err(e) = self
.delayed_lookups .delayed_lookups
.try_send(DelayedLookupMessage::MissingComponents(block_root)) .try_send(DelayedLookupMessage::MissingComponents(block_root))
@ -670,16 +709,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
} }
} }
SyncMessage::MissingGossipBlockComponentsDelayed(block_root) => { SyncMessage::MissingGossipBlockComponentsDelayed(block_root) => self
if self
.block_lookups .block_lookups
.trigger_lookup_by_root(block_root, &mut self.network) .trigger_lookup_by_root(block_root, &self.network),
.is_err()
{
// No request was made for block or blob so the lookup is dropped.
self.block_lookups.remove_lookup_by_root(block_root);
}
}
SyncMessage::Disconnect(peer_id) => { SyncMessage::Disconnect(peer_id) => {
self.peer_disconnect(&peer_id); self.peer_disconnect(&peer_id);
} }
@ -691,14 +723,24 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::BlockComponentProcessed { SyncMessage::BlockComponentProcessed {
process_type, process_type,
result, result,
response_type,
} => match process_type { } => match process_type {
BlockProcessType::SingleBlock { id } => self BlockProcessType::SingleBlock { id } => self
.block_lookups .block_lookups
.single_block_component_processed(id, result, response_type, &mut self.network), .single_block_component_processed::<BlockRequestState<Current>>(
id,
result,
&mut self.network,
),
BlockProcessType::SingleBlob { id } => self
.block_lookups
.single_block_component_processed::<BlobRequestState<Current, T::EthSpec>>(
id,
result,
&mut self.network,
),
BlockProcessType::ParentLookup { chain_hash } => self BlockProcessType::ParentLookup { chain_hash } => self
.block_lookups .block_lookups
.parent_block_processed(chain_hash, result, response_type, &mut self.network), .parent_block_processed(chain_hash, result, &mut self.network),
}, },
SyncMessage::BatchProcessed { sync_type, result } => match sync_type { SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
@ -727,7 +769,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
ChainSegmentProcessId::ParentLookup(chain_hash) => self ChainSegmentProcessId::ParentLookup(chain_hash) => self
.block_lookups .block_lookups
.parent_chain_processed(chain_hash, result, &mut self.network), .parent_chain_processed(chain_hash, result, &self.network),
}, },
} }
} }
@ -738,7 +780,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
block_root: Hash256, block_root: Hash256,
parent_root: Hash256, parent_root: Hash256,
slot: Slot, slot: Slot,
parent_components: Option<UnknownParentComponents<T::EthSpec>>, child_components: Option<CachedChildComponents<T::EthSpec>>,
) { ) {
if self.should_search_for_block(slot, &peer_id) { if self.should_search_for_block(slot, &peer_id) {
self.block_lookups.search_parent( self.block_lookups.search_parent(
@ -751,8 +793,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if self.should_delay_lookup(slot) { if self.should_delay_lookup(slot) {
self.block_lookups.search_child_delayed( self.block_lookups.search_child_delayed(
block_root, block_root,
parent_components, child_components,
&[PeerShouldHave::Neither(peer_id)], &[PeerShouldHave::Neither(peer_id)],
&mut self.network,
); );
if let Err(e) = self if let Err(e) = self
.delayed_lookups .delayed_lookups
@ -763,7 +806,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} else { } else {
self.block_lookups.search_child_block( self.block_lookups.search_child_block(
block_root, block_root,
parent_components, child_components,
&[PeerShouldHave::Neither(peer_id)], &[PeerShouldHave::Neither(peer_id)],
&mut self.network, &mut self.network,
); );
@ -883,20 +926,30 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
match request_id { match request_id {
RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( RequestId::SingleBlock { id } => self
.block_lookups
.single_lookup_response::<BlockRequestState<Current>>(
id, id,
peer_id, peer_id,
block, block,
seen_timestamp, seen_timestamp,
&mut self.network, &self.network,
), ),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( RequestId::SingleBlob { .. } => {
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
}
RequestId::ParentLookup { id } => self
.block_lookups
.parent_lookup_response::<BlockRequestState<Parent>>(
id, id,
peer_id, peer_id,
block, block,
seen_timestamp, seen_timestamp,
&mut self.network, &self.network,
), ),
RequestId::ParentLookupBlob { id: _ } => {
crit!(self.log, "Block received during parent blob request"; "peer_id" => %peer_id );
}
RequestId::BackFillBlocks { id } => { RequestId::BackFillBlocks { id } => {
let is_stream_terminator = block.is_none(); let is_stream_terminator = block.is_none();
if let Some(batch_id) = self if let Some(batch_id) = self
@ -954,19 +1007,30 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
match request_id { match request_id {
RequestId::SingleBlock { id } => self.block_lookups.single_blob_lookup_response( RequestId::SingleBlock { .. } => {
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
}
RequestId::SingleBlob { id } => self
.block_lookups
.single_lookup_response::<BlobRequestState<Current, T::EthSpec>>(
id, id,
peer_id, peer_id,
blob, blob,
seen_timestamp, seen_timestamp,
&mut self.network, &self.network,
), ),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_blob_response(
RequestId::ParentLookup { id: _ } => {
crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id );
}
RequestId::ParentLookupBlob { id } => self
.block_lookups
.parent_lookup_response::<BlobRequestState<Parent, T::EthSpec>>(
id, id,
peer_id, peer_id,
blob, blob,
seen_timestamp, seen_timestamp,
&mut self.network, &self.network,
), ),
RequestId::BackFillBlocks { id: _ } => { RequestId::BackFillBlocks { id: _ } => {
crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id ); crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id );

View File

@ -9,6 +9,6 @@ mod network_context;
mod peer_sync_info; mod peer_sync_info;
mod range_sync; mod range_sync;
pub use block_lookups::UnknownParentComponents; pub use block_lookups::CachedChildComponents;
pub use manager::{BatchProcessResult, SyncMessage}; pub use manager::{BatchProcessResult, SyncMessage};
pub use range_sync::{BatchOperationOutcome, ChainId}; pub use range_sync::{BatchOperationOutcome, ChainId};

View File

@ -7,7 +7,8 @@ use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::block_lookups::{BlobRequestId, BlockRequestId}; use crate::sync::block_lookups::common::LookupType;
use crate::sync::manager::SingleLookupReqId;
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use fnv::FnvHashMap; use fnv::FnvHashMap;
@ -62,7 +63,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>, pub chain: Arc<BeaconChain<T>>,
/// Logger for the `SyncNetworkContext`. /// Logger for the `SyncNetworkContext`.
log: slog::Logger, pub log: slog::Logger,
} }
/// Small enumeration to make dealing with block and blob requests easier. /// Small enumeration to make dealing with block and blob requests easier.
@ -118,11 +119,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.unwrap_or_default() .unwrap_or_default()
} }
pub fn status_peers<C: ToStatusMessage>( pub fn status_peers<C: ToStatusMessage>(&self, chain: &C, peers: impl Iterator<Item = PeerId>) {
&mut self,
chain: &C,
peers: impl Iterator<Item = PeerId>,
) {
let status_message = chain.status_message(); let status_message = chain.status_message();
for peer_id in peers { for peer_id in peers {
debug!( debug!(
@ -408,21 +405,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} }
} }
/// Sends a blocks by root request for a parent request. pub fn block_lookup_request(
pub fn single_block_lookup_request( &self,
&mut self, id: SingleLookupReqId,
peer_id: PeerId, peer_id: PeerId,
request: BlocksByRootRequest, request: BlocksByRootRequest,
) -> Result<Id, &'static str> { lookup_type: LookupType,
let id = self.next_id(); ) -> Result<(), &'static str> {
let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); let sync_id = match lookup_type {
LookupType::Current => SyncRequestId::SingleBlock { id },
LookupType::Parent => SyncRequestId::ParentLookup { id },
};
let request_id = RequestId::Sync(sync_id);
trace!( trace!(
self.log, self.log,
"Sending BlocksByRoot Request"; "Sending BlocksByRoot Request";
"method" => "BlocksByRoot", "method" => "BlocksByRoot",
"count" => request.block_roots().len(), "count" => request.block_roots().len(),
"peer" => %peer_id "peer" => %peer_id,
"lookup_type" => ?lookup_type
); );
self.send_network_msg(NetworkMessage::SendRequest { self.send_network_msg(NetworkMessage::SendRequest {
@ -430,82 +432,39 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: Request::BlocksByRoot(request), request: Request::BlocksByRoot(request),
request_id, request_id,
})?; })?;
Ok(id) Ok(())
} }
/// Sends a blobs by root request for a parent request. pub fn blob_lookup_request(
pub fn single_blobs_lookup_request( &self,
&mut self, id: SingleLookupReqId,
peer_id: PeerId, blob_peer_id: PeerId,
request: BlobsByRootRequest, blob_request: BlobsByRootRequest,
) -> Result<Id, &'static str> { lookup_type: LookupType,
let id = self.next_id(); ) -> Result<(), &'static str> {
let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); let sync_id = match lookup_type {
LookupType::Current => SyncRequestId::SingleBlob { id },
LookupType::Parent => SyncRequestId::ParentLookupBlob { id },
};
let request_id = RequestId::Sync(sync_id);
if !blob_request.blob_ids.is_empty() {
trace!( trace!(
self.log, self.log,
"Sending BlobsByRoot Request"; "Sending BlobsByRoot Request";
"method" => "BlobsByRoot", "method" => "BlobsByRoot",
"count" => request.blob_ids.len(), "count" => blob_request.blob_ids.len(),
"peer" => %peer_id "peer" => %blob_peer_id,
"lookup_type" => ?lookup_type
); );
self.send_network_msg(NetworkMessage::SendRequest { self.send_network_msg(NetworkMessage::SendRequest {
peer_id, peer_id: blob_peer_id,
request: Request::BlobsByRoot(request), request: Request::BlobsByRoot(blob_request),
request_id, request_id,
})?; })?;
Ok(id)
} }
Ok(())
/// Sends a blocks by root request for a parent request.
pub fn parent_lookup_block_request(
&mut self,
peer_id: PeerId,
request: BlocksByRootRequest,
) -> Result<BlockRequestId, &'static str> {
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id });
trace!(
self.log,
"Sending parent BlocksByRoot Request";
"method" => "BlocksByRoot",
"count" => request.block_roots().len(),
"peer" => %peer_id
);
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlocksByRoot(request),
request_id,
})?;
Ok(id)
}
/// Sends a blocks by root request for a parent request.
pub fn parent_lookup_blobs_request(
&mut self,
peer_id: PeerId,
request: BlobsByRootRequest,
) -> Result<BlobRequestId, &'static str> {
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id });
trace!(
self.log,
"Sending parent BlobsByRoot Request";
"method" => "BlobsByRoot",
"count" => request.blob_ids.len(),
"peer" => %peer_id
);
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlobsByRoot(request),
request_id,
})?;
Ok(id)
} }
pub fn is_execution_engine_online(&self) -> bool { pub fn is_execution_engine_online(&self) -> bool {
@ -532,7 +491,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} }
/// Reports to the scoring algorithm the behaviour of a peer. /// Reports to the scoring algorithm the behaviour of a peer.
pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction, msg: &'static str) { pub fn report_peer(&self, peer_id: PeerId, action: PeerAction, msg: &'static str) {
debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action); debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action);
self.network_send self.network_send
.send(NetworkMessage::ReportPeer { .send(NetworkMessage::ReportPeer {
@ -547,7 +506,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} }
/// Subscribes to core topics. /// Subscribes to core topics.
pub fn subscribe_core_topics(&mut self) { pub fn subscribe_core_topics(&self) {
self.network_send self.network_send
.send(NetworkMessage::SubscribeCoreTopics) .send(NetworkMessage::SubscribeCoreTopics)
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
@ -556,7 +515,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} }
/// Sends an arbitrary network message. /// Sends an arbitrary network message.
fn send_network_msg(&mut self, msg: NetworkMessage<T::EthSpec>) -> Result<(), &'static str> { fn send_network_msg(&self, msg: NetworkMessage<T::EthSpec>) -> Result<(), &'static str> {
self.network_send.send(msg).map_err(|_| { self.network_send.send(msg).map_err(|_| {
debug!(self.log, "Could not send message to the network service"); debug!(self.log, "Could not send message to the network service");
"Network channel send Failed" "Network channel send Failed"
@ -572,7 +531,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&self.network_beacon_processor &self.network_beacon_processor
} }
fn next_id(&mut self) -> Id { pub(crate) fn next_id(&mut self) -> Id {
let id = self.request_id; let id = self.request_id;
self.request_id += 1; self.request_id += 1;
id id
@ -587,7 +546,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
const _: () = assert!( const _: () = assert!(
super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1 super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1
&& super::range_sync::EPOCHS_PER_BATCH == 1, && super::range_sync::EPOCHS_PER_BATCH == 1,
"To deal with alignment with 4844 boundaries, batches need to be of just one epoch" "To deal with alignment with deneb boundaries, batches need to be of just one epoch"
); );
#[cfg(test)] #[cfg(test)]
@ -596,7 +555,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
ByRangeRequestType::Blocks ByRangeRequestType::Blocks
} }
#[cfg(not(test))] #[cfg(not(test))]
{
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
if epoch >= data_availability_boundary { if epoch >= data_availability_boundary {
ByRangeRequestType::BlocksAndBlobs ByRangeRequestType::BlocksAndBlobs
@ -608,4 +566,3 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} }
} }
} }
}