Remove delayed lookups (#4992)

* initial rip out

* fix unused imports

* delete tests and fix lint

* fix peers scoring for blobs
This commit is contained in:
realbigsean 2023-12-08 15:42:55 -05:00 committed by GitHub
parent b882519d2f
commit 46184e5ce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 94 additions and 653 deletions

View File

@ -413,31 +413,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.incomplete_processing_components(slot) .incomplete_processing_components(slot)
} }
/// Determines whether we are at least the `single_lookup_delay` duration into the given slot.
/// If we are not currently in the Deneb fork, this delay is not considered.
///
/// The `single_lookup_delay` is the duration we wait for a blocks or blobs to arrive over
/// gossip before making single block or blob requests. This is to minimize the number of
/// single lookup requests we end up making.
pub fn should_delay_lookup(&self, slot: Slot) -> bool {
if !self.is_deneb() {
return false;
}
let current_or_future_slot = self
.slot_clock
.now()
.map_or(false, |current_slot| current_slot <= slot);
let delay_threshold_unmet = self
.slot_clock
.millis_from_current_slot_start()
.map_or(false, |millis_into_slot| {
millis_into_slot < self.slot_clock.single_lookup_delay()
});
current_or_future_slot && delay_threshold_unmet
}
/// The epoch at which we require a data availability check in block processing. /// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled. /// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> { pub fn data_availability_boundary(&self) -> Option<Epoch> {

View File

@ -4,8 +4,6 @@ use crate::{
service::NetworkMessage, service::NetworkMessage,
sync::SyncMessage, sync::SyncMessage,
}; };
use std::collections::HashSet;
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::store::Error; use beacon_chain::store::Error;
@ -756,11 +754,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let blob_slot = verified_blob.slot(); let blob_slot = verified_blob.slot();
let blob_index = verified_blob.id().index; let blob_index = verified_blob.id().index;
let delay_lookup = self
.chain
.data_availability_checker
.should_delay_lookup(blob_slot);
match self.chain.process_gossip_blob(verified_blob).await { match self.chain.process_gossip_blob(verified_blob).await {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => { Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
// Note: Reusing block imported metric here // Note: Reusing block imported metric here
@ -772,29 +765,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
); );
self.chain.recompute_head_at_current_slot().await; self.chain.recompute_head_at_current_slot().await;
} }
Ok(AvailabilityProcessingStatus::MissingComponents(_slot, block_root)) => { Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
if delay_lookup { trace!(
self.cache_peer(peer_id, &block_root); self.log,
trace!( "Processed blob, waiting for other components";
self.log, "slot" => %slot,
"Processed blob, delaying lookup for other components"; "blob_index" => %blob_index,
"slot" => %blob_slot, "block_root" => %block_root,
"blob_index" => %blob_index, );
"block_root" => %block_root,
);
} else {
trace!(
self.log,
"Missing block components for gossip verified blob";
"slot" => %blob_slot,
"blob_index" => %blob_index,
"block_root" => %block_root,
);
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
vec![peer_id],
block_root,
));
}
} }
Err(err) => { Err(err) => {
debug!( debug!(
@ -818,18 +796,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
} }
} }
/// Cache the peer id for the given block root.
fn cache_peer(self: &Arc<Self>, peer_id: PeerId, block_root: &Hash256) {
let mut guard = self.delayed_lookup_peers.lock();
if let Some(peers) = guard.get_mut(block_root) {
peers.insert(peer_id);
} else {
let mut peers = HashSet::new();
peers.insert(peer_id);
guard.push(*block_root, peers);
}
}
/// Process the beacon block received from the gossip network and: /// Process the beacon block received from the gossip network and:
/// ///
/// - If it passes gossip propagation criteria, tell the network thread to forward it. /// - If it passes gossip propagation criteria, tell the network thread to forward it.
@ -1170,11 +1136,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned(); let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root; let block_root = verified_block.block_root;
let delay_lookup = self
.chain
.data_availability_checker
.should_delay_lookup(verified_block.block.slot());
let result = self let result = self
.chain .chain
.process_block_with_early_caching(block_root, verified_block, NotifyExecutionLayer::Yes) .process_block_with_early_caching(block_root, verified_block, NotifyExecutionLayer::Yes)
@ -1209,26 +1170,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.chain.recompute_head_at_current_slot().await; self.chain.recompute_head_at_current_slot().await;
} }
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
if delay_lookup { trace!(
self.cache_peer(peer_id, block_root); self.log,
trace!( "Processed block, waiting for other components";
self.log, "slot" => slot,
"Processed block, delaying lookup for other components"; "block_root" => %block_root,
"slot" => slot, );
"block_root" => %block_root,
);
} else {
trace!(
self.log,
"Missing block components for gossip verified block";
"slot" => slot,
"block_root" => %block_root,
);
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
vec![peer_id],
*block_root,
));
}
} }
Err(BlockError::ParentUnknown(block)) => { Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block // Inform the sync manager to find parents for this block

View File

@ -18,11 +18,8 @@ use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
}; };
use lru::LruCache; use slog::{debug, Logger};
use parking_lot::Mutex; use slot_clock::ManualSlotClock;
use slog::{crit, debug, error, trace, Logger};
use slot_clock::{ManualSlotClock, SlotClock};
use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -30,7 +27,6 @@ use store::MemoryStore;
use task_executor::test_utils::TestRuntime; use task_executor::test_utils::TestRuntime;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError}; use tokio::sync::mpsc::{self, error::TrySendError};
use tokio::time::{interval_at, Instant};
use types::*; use types::*;
pub use sync_methods::ChainSegmentProcessId; pub use sync_methods::ChainSegmentProcessId;
@ -44,7 +40,6 @@ mod sync_methods;
mod tests; mod tests;
pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;
pub const DELAYED_PEER_CACHE_SIZE: usize = 16;
/// Defines if and where we will store the SSZ files of invalid blocks. /// Defines if and where we will store the SSZ files of invalid blocks.
#[derive(Clone)] #[derive(Clone)]
@ -65,7 +60,6 @@ pub struct NetworkBeaconProcessor<T: BeaconChainTypes> {
pub reprocess_tx: mpsc::Sender<ReprocessQueueMessage>, pub reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
pub network_globals: Arc<NetworkGlobals<T::EthSpec>>, pub network_globals: Arc<NetworkGlobals<T::EthSpec>>,
pub invalid_block_storage: InvalidBlockStorage, pub invalid_block_storage: InvalidBlockStorage,
pub delayed_lookup_peers: Mutex<LruCache<Hash256, HashSet<PeerId>>>,
pub executor: TaskExecutor, pub executor: TaskExecutor,
pub log: Logger, pub log: Logger,
} }
@ -630,68 +624,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"error" => %e) "error" => %e)
}); });
} }
/// This service is responsible for collecting lookup messages and sending them back to sync
/// for processing after a short delay.
///
/// We want to delay lookups triggered from gossip for the following reasons:
///
/// - We only want to make one request for components we are unlikely to see on gossip. This means
/// we don't have to repeatedly update our RPC request's state as we receive gossip components.
///
/// - We are likely to receive blocks/blobs over gossip more quickly than we could via an RPC request.
///
/// - Delaying a lookup means we are less likely to simultaneously download the same blocks/blobs
/// over gossip and RPC.
///
/// - We would prefer to request peers based on whether we've seen them attest, because this gives
/// us an idea about whether they *should* have the block/blobs we're missing. This is because a
/// node should not attest to a block unless it has all the blobs for that block. This gives us a
/// stronger basis for peer scoring.
pub fn spawn_delayed_lookup_service(self: &Arc<Self>) {
let processor_clone = self.clone();
let executor = self.executor.clone();
let log = self.log.clone();
let beacon_chain = self.chain.clone();
executor.spawn(
async move {
let slot_duration = beacon_chain.slot_clock.slot_duration();
let delay = beacon_chain.slot_clock.single_lookup_delay();
let interval_start = match (
beacon_chain.slot_clock.duration_to_next_slot(),
beacon_chain.slot_clock.seconds_from_current_slot_start(),
) {
(Some(duration_to_next_slot), Some(seconds_from_current_slot_start)) => {
let duration_until_start = if seconds_from_current_slot_start > delay {
duration_to_next_slot + delay
} else {
delay - seconds_from_current_slot_start
};
Instant::now() + duration_until_start
}
_ => {
crit!(log,
"Failed to read slot clock, delayed lookup service timing will be inaccurate.\
This may degrade performance"
);
Instant::now()
}
};
let mut interval = interval_at(interval_start, slot_duration);
loop {
interval.tick().await;
let Some(slot) = beacon_chain.slot_clock.now_or_genesis() else {
error!(log, "Skipping delayed lookup poll, unable to read slot clock");
continue
};
trace!(log, "Polling delayed lookups for slot: {slot}");
processor_clone.poll_delayed_lookups(slot)
}
},
"delayed_lookups",
);
}
} }
type TestBeaconChainType<E> = type TestBeaconChainType<E> =
@ -734,7 +666,6 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
reprocess_tx: work_reprocessing_tx, reprocess_tx: work_reprocessing_tx,
network_globals, network_globals,
invalid_block_storage: InvalidBlockStorage::Disabled, invalid_block_storage: InvalidBlockStorage::Disabled,
delayed_lookup_peers: Mutex::new(LruCache::new(DELAYED_PEER_CACHE_SIZE)),
executor: runtime.task_executor.clone(), executor: runtime.task_executor.clone(),
log, log,
}; };

View File

@ -19,7 +19,7 @@ use beacon_processor::{
AsyncFn, BlockingFn, DuplicateCache, AsyncFn, BlockingFn, DuplicateCache,
}; };
use lighthouse_network::PeerAction; use lighthouse_network::PeerAction;
use slog::{debug, error, info, trace, warn}; use slog::{debug, error, info, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -28,7 +28,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments; use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList; use types::blob_sidecar::FixedBlobSidecarList;
use types::{Epoch, Hash256, Slot}; use types::{Epoch, Hash256};
/// Id associated to a batch processing request, either a sync batch or a parent lookup. /// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
@ -373,27 +373,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}); });
} }
/// Poll the beacon chain for any delayed lookups that are now available.
pub fn poll_delayed_lookups(&self, slot: Slot) {
let block_roots = self
.chain
.data_availability_checker
.incomplete_processing_components(slot);
if block_roots.is_empty() {
trace!(self.log, "No delayed lookups found on poll");
} else {
debug!(self.log, "Found delayed lookups on poll"; "lookup_count" => block_roots.len());
}
for block_root in block_roots {
if let Some(peer_ids) = self.delayed_lookup_peers.lock().pop(&block_root) {
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
peer_ids.into_iter().collect(),
block_root,
));
}
}
}
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it. /// thread if more blocks are needed to process it.
pub async fn process_chain_segment( pub async fn process_chain_segment(

View File

@ -1,7 +1,6 @@
#![cfg(not(debug_assertions))] // Tests are too slow in debug. #![cfg(not(debug_assertions))] // Tests are too slow in debug.
#![cfg(test)] #![cfg(test)]
use crate::network_beacon_processor::DELAYED_PEER_CACHE_SIZE;
use crate::{ use crate::{
network_beacon_processor::{ network_beacon_processor::{
ChainSegmentProcessId, DuplicateCache, InvalidBlockStorage, NetworkBeaconProcessor, ChainSegmentProcessId, DuplicateCache, InvalidBlockStorage, NetworkBeaconProcessor,
@ -24,8 +23,6 @@ use lighthouse_network::{
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}, types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
Client, MessageId, NetworkGlobals, PeerId, Response, Client, MessageId, NetworkGlobals, PeerId, Response,
}; };
use lru::LruCache;
use parking_lot::Mutex;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::iter::Iterator; use std::iter::Iterator;
use std::sync::Arc; use std::sync::Arc;
@ -223,7 +220,6 @@ impl TestRig {
reprocess_tx: work_reprocessing_tx.clone(), reprocess_tx: work_reprocessing_tx.clone(),
network_globals: network_globals.clone(), network_globals: network_globals.clone(),
invalid_block_storage: InvalidBlockStorage::Disabled, invalid_block_storage: InvalidBlockStorage::Disabled,
delayed_lookup_peers: Mutex::new(LruCache::new(DELAYED_PEER_CACHE_SIZE)),
executor: executor.clone(), executor: executor.clone(),
log: log.clone(), log: log.clone(),
}; };

View File

@ -21,8 +21,6 @@ use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
}; };
use logging::TimeLatch; use logging::TimeLatch;
use lru::LruCache;
use parking_lot::Mutex;
use slog::{crit, debug, o, trace}; use slog::{crit, debug, o, trace};
use slog::{error, warn}; use slog::{error, warn};
use std::sync::Arc; use std::sync::Arc;
@ -111,14 +109,10 @@ impl<T: BeaconChainTypes> Router<T> {
reprocess_tx: beacon_processor_reprocess_tx, reprocess_tx: beacon_processor_reprocess_tx,
network_globals: network_globals.clone(), network_globals: network_globals.clone(),
invalid_block_storage, invalid_block_storage,
delayed_lookup_peers: Mutex::new(LruCache::new(
crate::network_beacon_processor::DELAYED_PEER_CACHE_SIZE,
)),
executor: executor.clone(), executor: executor.clone(),
log: log.clone(), log: log.clone(),
}; };
let network_beacon_processor = Arc::new(network_beacon_processor); let network_beacon_processor = Arc::new(network_beacon_processor);
network_beacon_processor.spawn_delayed_lookup_service();
// spawn the sync thread // spawn the sync thread
crate::sync::manager::spawn( crate::sync::manager::spawn(

View File

@ -3,8 +3,7 @@ use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State, LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State,
}; };
use crate::sync::block_lookups::{ use crate::sync::block_lookups::{
BlobRequestState, BlockLookups, BlockRequestState, PeerShouldHave, BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
}; };
use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId}; use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId};
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::SyncNetworkContext;
@ -13,7 +12,6 @@ use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents}
use beacon_chain::{get_block_root, BeaconChainTypes}; use beacon_chain::{get_block_root, BeaconChainTypes};
use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::rpc::methods::BlobsByRootRequest;
use lighthouse_network::rpc::BlocksByRootRequest; use lighthouse_network::rpc::BlocksByRootRequest;
use lighthouse_network::PeerId;
use rand::prelude::IteratorRandom; use rand::prelude::IteratorRandom;
use ssz_types::VariableList; use ssz_types::VariableList;
use std::ops::IndexMut; use std::ops::IndexMut;
@ -89,7 +87,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/* Request building methods */ /* Request building methods */
/// Construct a new request. /// Construct a new request.
fn build_request(&mut self) -> Result<(PeerShouldHave, Self::RequestType), LookupRequestError> { fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
// Verify and construct request. // Verify and construct request.
self.too_many_attempts()?; self.too_many_attempts()?;
let peer = self.get_peer()?; let peer = self.get_peer()?;
@ -121,7 +119,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
id, id,
req_counter: self.get_state().req_counter, req_counter: self.get_state().req_counter,
}; };
Self::make_request(id, peer_id.to_peer_id(), request, cx) Self::make_request(id, peer_id, request, cx)
} }
/// Verify the current request has not exceeded the maximum number of attempts. /// Verify the current request has not exceeded the maximum number of attempts.
@ -140,26 +138,15 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/// Get the next peer to request. Draws from the set of peers we think should have both the /// Get the next peer to request. Draws from the set of peers we think should have both the
/// block and blob first. If that fails, we draw from the set of peers that may have either. /// block and blob first. If that fails, we draw from the set of peers that may have either.
fn get_peer(&mut self) -> Result<PeerShouldHave, LookupRequestError> { fn get_peer(&mut self) -> Result<PeerId, LookupRequestError> {
let request_state = self.get_state_mut(); let request_state = self.get_state_mut();
let available_peer_opt = request_state let peer_id = request_state
.available_peers .available_peers
.iter() .iter()
.choose(&mut rand::thread_rng()) .choose(&mut rand::thread_rng())
.copied() .copied()
.map(PeerShouldHave::BlockAndBlobs); .ok_or(LookupRequestError::NoPeers)?;
request_state.used_peers.insert(peer_id);
let Some(peer_id) = available_peer_opt.or_else(|| {
request_state
.potential_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()
.map(PeerShouldHave::Neither)
}) else {
return Err(LookupRequestError::NoPeers);
};
request_state.used_peers.insert(peer_id.to_peer_id());
Ok(peer_id) Ok(peer_id)
} }
@ -211,7 +198,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
&mut self, &mut self,
expected_block_root: Hash256, expected_block_root: Hash256,
response: Option<Self::ResponseType>, response: Option<Self::ResponseType>,
peer_id: PeerShouldHave, peer_id: PeerId,
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError>; ) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError>;
/// A getter for the parent root of the response. Returns an `Option` because we won't know /// A getter for the parent root of the response. Returns an `Option` because we won't know
@ -241,11 +228,6 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
cx: &SyncNetworkContext<T>, cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError>; ) -> Result<(), LookupRequestError>;
/// Remove the peer from the lookup if it is useless.
fn remove_if_useless(&mut self, peer: &PeerId) {
self.get_state_mut().remove_peer_if_useless(peer)
}
/// Register a failure to process the block or blob. /// Register a failure to process the block or blob.
fn register_failure_downloading(&mut self) { fn register_failure_downloading(&mut self) {
self.get_state_mut().register_failure_downloading() self.get_state_mut().register_failure_downloading()
@ -290,7 +272,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
&mut self, &mut self,
expected_block_root: Hash256, expected_block_root: Hash256,
response: Option<Self::ResponseType>, response: Option<Self::ResponseType>,
peer_id: PeerShouldHave, peer_id: PeerId,
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, LookupVerifyError> { ) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, LookupVerifyError> {
match response { match response {
Some(block) => { Some(block) => {
@ -310,13 +292,8 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
} }
} }
None => { None => {
if peer_id.should_have_block() { self.state.register_failure_downloading();
self.state.register_failure_downloading(); Err(LookupVerifyError::NoBlockReturned)
Err(LookupVerifyError::NoBlockReturned)
} else {
self.state.state = State::AwaitingDownload;
Err(LookupVerifyError::BenignFailure)
}
} }
} }
} }
@ -396,7 +373,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
&mut self, &mut self,
_expected_block_root: Hash256, _expected_block_root: Hash256,
blob: Option<Self::ResponseType>, blob: Option<Self::ResponseType>,
peer_id: PeerShouldHave, peer_id: PeerId,
) -> Result<Option<FixedBlobSidecarList<T::EthSpec>>, LookupVerifyError> { ) -> Result<Option<FixedBlobSidecarList<T::EthSpec>>, LookupVerifyError> {
match blob { match blob {
Some(blob) => { Some(blob) => {

View File

@ -7,9 +7,7 @@ use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId; use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::block_lookups::common::LookupType; use crate::sync::block_lookups::common::LookupType;
use crate::sync::block_lookups::parent_lookup::{ParentLookup, RequestError}; use crate::sync::block_lookups::parent_lookup::{ParentLookup, RequestError};
use crate::sync::block_lookups::single_block_lookup::{ use crate::sync::block_lookups::single_block_lookup::{CachedChild, LookupRequestError};
CachedChild, LookupRequestError, LookupVerifyError,
};
use crate::sync::manager::{Id, SingleLookupReqId}; use crate::sync::manager::{Id, SingleLookupReqId};
use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
pub use beacon_chain::data_availability_checker::ChildComponents; pub use beacon_chain::data_availability_checker::ChildComponents;
@ -30,11 +28,9 @@ pub use single_block_lookup::{BlobRequestState, BlockRequestState};
use slog::{debug, error, trace, warn, Logger}; use slog::{debug, error, trace, warn, Logger};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use store::Hash256; use store::Hash256;
use strum::Display;
use types::blob_sidecar::FixedBlobSidecarList; use types::blob_sidecar::FixedBlobSidecarList;
use types::Slot; use types::Slot;
@ -49,43 +45,6 @@ pub type DownloadedBlock<T> = (Hash256, RpcBlock<T>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
/// This enum is used to track what a peer *should* be able to respond with based on
/// other messages we've seen from this peer on the network. This is useful for peer scoring.
/// We expect a peer tracked by the `BlockAndBlobs` variant to be able to respond to all
/// components of a block. This peer has either sent an attestation for the requested block
/// or has forwarded a block or blob that is a descendant of the requested block. An honest node
/// should not attest unless it has all components of a block, and it should not forward
/// messages if it does not have all components of the parent block. A peer tracked by the
/// `Neither` variant has likely just sent us a block or blob over gossip, in which case we
/// can't know whether the peer has all components of the block, and could be acting honestly
/// by forwarding a message without any other block components.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Display)]
pub enum PeerShouldHave {
BlockAndBlobs(PeerId),
Neither(PeerId),
}
impl PeerShouldHave {
fn as_peer_id(&self) -> &PeerId {
match self {
PeerShouldHave::BlockAndBlobs(id) => id,
PeerShouldHave::Neither(id) => id,
}
}
fn to_peer_id(self) -> PeerId {
match self {
PeerShouldHave::BlockAndBlobs(id) => id,
PeerShouldHave::Neither(id) => id,
}
}
fn should_have_block(&self) -> bool {
match self {
PeerShouldHave::BlockAndBlobs(_) => true,
PeerShouldHave::Neither(_) => false,
}
}
}
pub struct BlockLookups<T: BeaconChainTypes> { pub struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded. /// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>, parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
@ -123,7 +82,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn search_block( pub fn search_block(
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
peer_source: &[PeerShouldHave], peer_source: &[PeerId],
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) { ) {
self.new_current_lookup(block_root, None, peer_source, cx) self.new_current_lookup(block_root, None, peer_source, cx)
@ -139,7 +98,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
child_components: ChildComponents<T::EthSpec>, child_components: ChildComponents<T::EthSpec>,
peer_source: &[PeerShouldHave], peer_source: &[PeerId],
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) { ) {
self.new_current_lookup(block_root, Some(child_components), peer_source, cx) self.new_current_lookup(block_root, Some(child_components), peer_source, cx)
@ -180,7 +139,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
child_components: Option<ChildComponents<T::EthSpec>>, child_components: Option<ChildComponents<T::EthSpec>>,
peers: &[PeerShouldHave], peers: &[PeerId],
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) { ) {
// Do not re-request a block that is already being requested // Do not re-request a block that is already being requested
@ -248,9 +207,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId, peer_id: PeerId,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) { ) {
// Gossip blocks or blobs shouldn't be propagated if parents are unavailable.
let peer_source = PeerShouldHave::BlockAndBlobs(peer_id);
// If this block or it's parent is part of a known failed chain, ignore it. // If this block or it's parent is part of a known failed chain, ignore it.
if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) {
debug!(self.log, "Block is from a past failed chain. Dropping"; debug!(self.log, "Block is from a past failed chain. Dropping";
@ -263,7 +219,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if let Some(parent_lookup) = self.parent_lookups.iter_mut().find(|parent_req| { if let Some(parent_lookup) = self.parent_lookups.iter_mut().find(|parent_req| {
parent_req.contains_block(&block_root) || parent_req.is_for_block(block_root) parent_req.contains_block(&block_root) || parent_req.is_for_block(block_root)
}) { }) {
parent_lookup.add_peer(peer_source); parent_lookup.add_peer(peer_id);
// we are already searching for this block, ignore it // we are already searching for this block, ignore it
return; return;
} }
@ -279,7 +235,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let parent_lookup = ParentLookup::new( let parent_lookup = ParentLookup::new(
block_root, block_root,
parent_root, parent_root,
peer_source, peer_id,
self.da_checker.clone(), self.da_checker.clone(),
cx, cx,
); );
@ -398,14 +354,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"response_type" => ?response_type, "response_type" => ?response_type,
"error" => ?e "error" => ?e
); );
if matches!(e, LookupVerifyError::BenignFailure) { let msg = e.into();
request_state cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
.get_state_mut()
.remove_peer_if_useless(&peer_id);
} else {
let msg = e.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
};
request_state.register_failure_downloading(); request_state.register_failure_downloading();
lookup.request_block_and_blobs(cx)?; lookup.request_block_and_blobs(cx)?;
@ -456,7 +406,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// we should penalize the blobs peer because they did not provide all blobs on the // we should penalize the blobs peer because they did not provide all blobs on the
// initial request. // initial request.
if lookup.both_components_downloaded() { if lookup.both_components_downloaded() {
lookup.penalize_blob_peer(false, cx); lookup.penalize_blob_peer(cx);
lookup lookup
.blob_request_state .blob_request_state
.state .state
@ -619,15 +569,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"bbroot_failed_chains", "bbroot_failed_chains",
); );
} }
ParentVerifyError::BenignFailure => {
trace!(
self.log,
"Requested peer could not respond to block request, requesting a new peer";
);
let request_state = R::request_state_mut(&mut parent_lookup.current_parent_request);
request_state.remove_if_useless(&peer_id);
parent_lookup.request_parent(cx)?;
}
} }
Ok(()) Ok(())
} }
@ -846,7 +787,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
request_state.get_state_mut().component_processed = true; request_state.get_state_mut().component_processed = true;
if lookup.both_components_processed() { if lookup.both_components_processed() {
lookup.penalize_blob_peer(false, cx); lookup.penalize_blob_peer(cx);
// Try it again if possible. // Try it again if possible.
lookup lookup
@ -864,7 +805,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
mut lookup: SingleBlockLookup<Current, T>, mut lookup: SingleBlockLookup<Current, T>,
peer_id: PeerShouldHave, peer_id: PeerId,
e: BlockError<T::EthSpec>, e: BlockError<T::EthSpec>,
) -> Result<Option<SingleBlockLookup<Current, T>>, LookupRequestError> { ) -> Result<Option<SingleBlockLookup<Current, T>>, LookupRequestError> {
let root = lookup.block_root(); let root = lookup.block_root();
@ -884,7 +825,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let parent_root = block.parent_root(); let parent_root = block.parent_root();
lookup.add_child_components(block.into()); lookup.add_child_components(block.into());
lookup.request_block_and_blobs(cx)?; lookup.request_block_and_blobs(cx)?;
self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); self.search_parent(slot, root, parent_root, peer_id, cx);
} }
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline // These errors indicate that the execution layer is offline
@ -920,7 +861,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() { if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() {
cx.report_peer( cx.report_peer(
block_peer.to_peer_id(), block_peer,
PeerAction::MidToleranceError, PeerAction::MidToleranceError,
"single_block_failure", "single_block_failure",
); );
@ -1141,13 +1082,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let Ok(block_peer_id) = parent_lookup.block_processing_peer() else { let Ok(block_peer_id) = parent_lookup.block_processing_peer() else {
return; return;
}; };
let block_peer_id = block_peer_id.to_peer_id();
// We may not have a blob peer, if there were no blobs required for this block. // We may not have a blob peer, if there were no blobs required for this block.
let blob_peer_id = parent_lookup let blob_peer_id = parent_lookup.blob_processing_peer().ok();
.blob_processing_peer()
.ok()
.map(PeerShouldHave::to_peer_id);
// all else we consider the chain a failure and downvote the peer that sent // all else we consider the chain a failure and downvote the peer that sent
// us the last block // us the last block

View File

@ -1,5 +1,5 @@
use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup};
use super::{DownloadedBlock, PeerShouldHave}; use super::{DownloadedBlock, PeerId};
use crate::sync::block_lookups::common::Parent; use crate::sync::block_lookups::common::Parent;
use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::common::RequestState;
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
@ -8,7 +8,6 @@ use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::{ChildComponents, DataAvailabilityChecker}; use beacon_chain::data_availability_checker::{ChildComponents, DataAvailabilityChecker};
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use itertools::Itertools; use itertools::Itertools;
use lighthouse_network::PeerId;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use store::Hash256; use store::Hash256;
@ -41,7 +40,6 @@ pub enum ParentVerifyError {
ExtraBlobsReturned, ExtraBlobsReturned,
InvalidIndex(u64), InvalidIndex(u64),
PreviousFailure { parent_root: Hash256 }, PreviousFailure { parent_root: Hash256 },
BenignFailure,
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@ -61,7 +59,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
pub fn new( pub fn new(
block_root: Hash256, block_root: Hash256,
parent_root: Hash256, parent_root: Hash256,
peer_id: PeerShouldHave, peer_id: PeerId,
da_checker: Arc<DataAvailabilityChecker<T>>, da_checker: Arc<DataAvailabilityChecker<T>>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Self { ) -> Self {
@ -126,14 +124,14 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.update_requested_parent_block(next_parent) .update_requested_parent_block(next_parent)
} }
pub fn block_processing_peer(&self) -> Result<PeerShouldHave, ()> { pub fn block_processing_peer(&self) -> Result<PeerId, ()> {
self.current_parent_request self.current_parent_request
.block_request_state .block_request_state
.state .state
.processing_peer() .processing_peer()
} }
pub fn blob_processing_peer(&self) -> Result<PeerShouldHave, ()> { pub fn blob_processing_peer(&self) -> Result<PeerId, ()> {
self.current_parent_request self.current_parent_request
.blob_request_state .blob_request_state
.state .state
@ -211,12 +209,12 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Ok(root_and_verified) Ok(root_and_verified)
} }
pub fn add_peer(&mut self, peer: PeerShouldHave) { pub fn add_peer(&mut self, peer: PeerId) {
self.current_parent_request.add_peer(peer) self.current_parent_request.add_peer(peer)
} }
/// Adds a list of peers to the parent request. /// Adds a list of peers to the parent request.
pub fn add_peers(&mut self, peers: &[PeerShouldHave]) { pub fn add_peers(&mut self, peers: &[PeerId]) {
self.current_parent_request.add_peers(peers) self.current_parent_request.add_peers(peers)
} }
@ -248,7 +246,6 @@ impl From<LookupVerifyError> for ParentVerifyError {
E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned,
E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index),
E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned, E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned,
E::BenignFailure => ParentVerifyError::BenignFailure,
} }
} }
} }

View File

@ -1,4 +1,4 @@
use super::PeerShouldHave; use super::PeerId;
use crate::sync::block_lookups::common::{Lookup, RequestState}; use crate::sync::block_lookups::common::{Lookup, RequestState};
use crate::sync::block_lookups::Id; use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::SyncNetworkContext;
@ -8,7 +8,7 @@ use beacon_chain::data_availability_checker::{
}; };
use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents}; use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents};
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::PeerAction;
use slog::{trace, Logger}; use slog::{trace, Logger};
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
@ -22,8 +22,8 @@ use types::EthSpec;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum State { pub enum State {
AwaitingDownload, AwaitingDownload,
Downloading { peer_id: PeerShouldHave }, Downloading { peer_id: PeerId },
Processing { peer_id: PeerShouldHave }, Processing { peer_id: PeerId },
} }
#[derive(Debug, PartialEq, Eq, IntoStaticStr)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
@ -35,10 +35,6 @@ pub enum LookupVerifyError {
ExtraBlobsReturned, ExtraBlobsReturned,
NotEnoughBlobsReturned, NotEnoughBlobsReturned,
InvalidIndex(u64), InvalidIndex(u64),
/// We don't have enough information to know
/// whether the peer is at fault or simply missed
/// what was requested on gossip.
BenignFailure,
} }
#[derive(Debug, PartialEq, Eq, IntoStaticStr)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
@ -66,7 +62,7 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
pub fn new( pub fn new(
requested_block_root: Hash256, requested_block_root: Hash256,
child_components: Option<ChildComponents<T::EthSpec>>, child_components: Option<ChildComponents<T::EthSpec>>,
peers: &[PeerShouldHave], peers: &[PeerId],
da_checker: Arc<DataAvailabilityChecker<T>>, da_checker: Arc<DataAvailabilityChecker<T>>,
id: Id, id: Id,
) -> Self { ) -> Self {
@ -191,21 +187,13 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
} }
/// Add all given peers to both block and blob request states. /// Add all given peers to both block and blob request states.
pub fn add_peer(&mut self, peer: PeerShouldHave) { pub fn add_peer(&mut self, peer_id: PeerId) {
match peer { self.block_request_state.state.add_peer(&peer_id);
PeerShouldHave::BlockAndBlobs(peer_id) => { self.blob_request_state.state.add_peer(&peer_id);
self.block_request_state.state.add_peer(&peer_id);
self.blob_request_state.state.add_peer(&peer_id);
}
PeerShouldHave::Neither(peer_id) => {
self.block_request_state.state.add_potential_peer(&peer_id);
self.blob_request_state.state.add_potential_peer(&peer_id);
}
}
} }
/// Add all given peers to both block and blob request states. /// Add all given peers to both block and blob request states.
pub fn add_peers(&mut self, peers: &[PeerShouldHave]) { pub fn add_peers(&mut self, peers: &[PeerId]) {
for peer in peers { for peer in peers {
self.add_peer(*peer); self.add_peer(*peer);
} }
@ -293,38 +281,31 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
} }
} }
/// Penalizes a blob peer if it should have blobs but didn't return them to us. Does not penalize /// Penalizes a blob peer if it should have blobs but didn't return them to us.
/// a peer who we request blobs from based on seeing a block or blobs over gossip. This may pub fn penalize_blob_peer(&mut self, cx: &SyncNetworkContext<T>) {
/// have been a benign failure.
pub fn penalize_blob_peer(&mut self, penalize_always: bool, cx: &SyncNetworkContext<T>) {
if let Ok(blob_peer) = self.blob_request_state.state.processing_peer() { if let Ok(blob_peer) = self.blob_request_state.state.processing_peer() {
if penalize_always || matches!(blob_peer, PeerShouldHave::BlockAndBlobs(_)) { cx.report_peer(
cx.report_peer( blob_peer,
blob_peer.to_peer_id(), PeerAction::MidToleranceError,
PeerAction::MidToleranceError, "single_blob_failure",
"single_blob_failure", );
);
}
self.blob_request_state
.state
.remove_peer_if_useless(blob_peer.as_peer_id());
} }
} }
/// This failure occurs on download, so register a failure downloading, penalize the peer if /// This failure occurs on download, so register a failure downloading, penalize the peer
/// necessary and clear the blob cache. /// and clear the blob cache.
pub fn handle_consistency_failure(&mut self, cx: &SyncNetworkContext<T>) { pub fn handle_consistency_failure(&mut self, cx: &SyncNetworkContext<T>) {
self.penalize_blob_peer(false, cx); self.penalize_blob_peer(cx);
if let Some(cached_child) = self.child_components.as_mut() { if let Some(cached_child) = self.child_components.as_mut() {
cached_child.clear_blobs(); cached_child.clear_blobs();
} }
self.blob_request_state.state.register_failure_downloading() self.blob_request_state.state.register_failure_downloading()
} }
/// This failure occurs after processing, so register a failure processing, penalize the peer if /// This failure occurs after processing, so register a failure processing, penalize the peer
/// necessary and clear the blob cache. /// and clear the blob cache.
pub fn handle_availability_check_failure(&mut self, cx: &SyncNetworkContext<T>) { pub fn handle_availability_check_failure(&mut self, cx: &SyncNetworkContext<T>) {
self.penalize_blob_peer(true, cx); self.penalize_blob_peer(cx);
if let Some(cached_child) = self.child_components.as_mut() { if let Some(cached_child) = self.child_components.as_mut() {
cached_child.clear_blobs(); cached_child.clear_blobs();
} }
@ -345,7 +326,7 @@ pub struct BlobRequestState<L: Lookup, T: EthSpec> {
} }
impl<L: Lookup, E: EthSpec> BlobRequestState<L, E> { impl<L: Lookup, E: EthSpec> BlobRequestState<L, E> {
pub fn new(block_root: Hash256, peer_source: &[PeerShouldHave], is_deneb: bool) -> Self { pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self {
let default_ids = MissingBlobs::new_without_block(block_root, is_deneb); let default_ids = MissingBlobs::new_without_block(block_root, is_deneb);
Self { Self {
requested_ids: default_ids, requested_ids: default_ids,
@ -364,7 +345,7 @@ pub struct BlockRequestState<L: Lookup> {
} }
impl<L: Lookup> BlockRequestState<L> { impl<L: Lookup> BlockRequestState<L> {
pub fn new(block_root: Hash256, peers: &[PeerShouldHave]) -> Self { pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self {
Self { Self {
requested_block_root: block_root, requested_block_root: block_root,
state: SingleLookupRequestState::new(peers), state: SingleLookupRequestState::new(peers),
@ -396,8 +377,6 @@ pub struct SingleLookupRequestState {
pub state: State, pub state: State,
/// Peers that should have this block or blob. /// Peers that should have this block or blob.
pub available_peers: HashSet<PeerId>, pub available_peers: HashSet<PeerId>,
/// Peers that mar or may not have this block or blob.
pub potential_peers: HashSet<PeerId>,
/// Peers from which we have requested this block. /// Peers from which we have requested this block.
pub used_peers: HashSet<PeerId>, pub used_peers: HashSet<PeerId>,
/// How many times have we attempted to process this block or blob. /// How many times have we attempted to process this block or blob.
@ -417,24 +396,15 @@ pub struct SingleLookupRequestState {
} }
impl SingleLookupRequestState { impl SingleLookupRequestState {
pub fn new(peers: &[PeerShouldHave]) -> Self { pub fn new(peers: &[PeerId]) -> Self {
let mut available_peers = HashSet::default(); let mut available_peers = HashSet::default();
let mut potential_peers = HashSet::default(); for peer in peers.iter().copied() {
for peer in peers { available_peers.insert(peer);
match peer {
PeerShouldHave::BlockAndBlobs(peer_id) => {
available_peers.insert(*peer_id);
}
PeerShouldHave::Neither(peer_id) => {
potential_peers.insert(*peer_id);
}
}
} }
Self { Self {
state: State::AwaitingDownload, state: State::AwaitingDownload,
available_peers, available_peers,
potential_peers,
used_peers: HashSet::default(), used_peers: HashSet::default(),
failed_processing: 0, failed_processing: 0,
failed_downloading: 0, failed_downloading: 0,
@ -462,25 +432,16 @@ impl SingleLookupRequestState {
self.failed_processing + self.failed_downloading self.failed_processing + self.failed_downloading
} }
/// This method should be used for peers wrapped in `PeerShouldHave::BlockAndBlobs`. /// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`.
pub fn add_peer(&mut self, peer_id: &PeerId) { pub fn add_peer(&mut self, peer_id: &PeerId) {
self.potential_peers.remove(peer_id);
self.available_peers.insert(*peer_id); self.available_peers.insert(*peer_id);
} }
/// This method should be used for peers wrapped in `PeerShouldHave::Neither`.
pub fn add_potential_peer(&mut self, peer_id: &PeerId) {
if !self.available_peers.contains(peer_id) {
self.potential_peers.insert(*peer_id);
}
}
/// If a peer disconnects, this request could be failed. If so, an error is returned /// If a peer disconnects, this request could be failed. If so, an error is returned
pub fn check_peer_disconnected(&mut self, dc_peer_id: &PeerId) -> Result<(), ()> { pub fn check_peer_disconnected(&mut self, dc_peer_id: &PeerId) -> Result<(), ()> {
self.available_peers.remove(dc_peer_id); self.available_peers.remove(dc_peer_id);
self.potential_peers.remove(dc_peer_id);
if let State::Downloading { peer_id } = &self.state { if let State::Downloading { peer_id } = &self.state {
if peer_id.as_peer_id() == dc_peer_id { if peer_id == dc_peer_id {
// Peer disconnected before providing a block // Peer disconnected before providing a block
self.register_failure_downloading(); self.register_failure_downloading();
return Err(()); return Err(());
@ -491,21 +452,13 @@ impl SingleLookupRequestState {
/// Returns the id peer we downloaded from if we have downloaded a verified block, otherwise /// Returns the id peer we downloaded from if we have downloaded a verified block, otherwise
/// returns an error. /// returns an error.
pub fn processing_peer(&self) -> Result<PeerShouldHave, ()> { pub fn processing_peer(&self) -> Result<PeerId, ()> {
if let State::Processing { peer_id } = &self.state { if let State::Processing { peer_id } = &self.state {
Ok(*peer_id) Ok(*peer_id)
} else { } else {
Err(()) Err(())
} }
} }
/// Remove the given peer from the set of potential peers, so long as there is at least one
/// other potential peer or we have any available peers.
pub fn remove_peer_if_useless(&mut self, peer_id: &PeerId) {
if !self.available_peers.is_empty() || self.potential_peers.len() > 1 {
self.potential_peers.remove(peer_id);
}
}
} }
impl<L: Lookup, T: BeaconChainTypes> slog::Value for SingleBlockLookup<L, T> { impl<L: Lookup, T: BeaconChainTypes> slog::Value for SingleBlockLookup<L, T> {
@ -609,7 +562,7 @@ mod tests {
#[test] #[test]
fn test_happy_path() { fn test_happy_path() {
let peer_id = PeerShouldHave::BlockAndBlobs(PeerId::random()); let peer_id = PeerId::random();
let block = rand_block(); let block = rand_block();
let spec = E::default_spec(); let spec = E::default_spec();
let slot_clock = TestingSlotClock::new( let slot_clock = TestingSlotClock::new(
@ -649,7 +602,7 @@ mod tests {
#[test] #[test]
fn test_block_lookup_failures() { fn test_block_lookup_failures() {
let peer_id = PeerShouldHave::BlockAndBlobs(PeerId::random()); let peer_id = PeerId::random();
let block = rand_block(); let block = rand_block();
let spec = E::default_spec(); let spec = E::default_spec();
let slot_clock = TestingSlotClock::new( let slot_clock = TestingSlotClock::new(

View File

@ -233,11 +233,7 @@ fn test_single_block_lookup_happy_path() {
let peer_id = PeerId::random(); let peer_id = PeerId::random();
let block_root = block.canonical_root(); let block_root = block.canonical_root();
// Trigger the request // Trigger the request
bl.search_block( bl.search_block(block_root, &[peer_id], &mut cx);
block_root,
&[PeerShouldHave::BlockAndBlobs(peer_id)],
&mut cx,
);
let id = rig.expect_lookup_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
@ -285,11 +281,7 @@ fn test_single_block_lookup_empty_response() {
let peer_id = PeerId::random(); let peer_id = PeerId::random();
// Trigger the request // Trigger the request
bl.search_block( bl.search_block(block_hash, &[peer_id], &mut cx);
block_hash,
&[PeerShouldHave::BlockAndBlobs(peer_id)],
&mut cx,
);
let id = rig.expect_lookup_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
@ -317,11 +309,7 @@ fn test_single_block_lookup_wrong_response() {
let peer_id = PeerId::random(); let peer_id = PeerId::random();
// Trigger the request // Trigger the request
bl.search_block( bl.search_block(block_hash, &[peer_id], &mut cx);
block_hash,
&[PeerShouldHave::BlockAndBlobs(peer_id)],
&mut cx,
);
let id = rig.expect_lookup_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
@ -359,11 +347,7 @@ fn test_single_block_lookup_failure() {
let peer_id = PeerId::random(); let peer_id = PeerId::random();
// Trigger the request // Trigger the request
bl.search_block( bl.search_block(block_hash, &[peer_id], &mut cx);
block_hash,
&[PeerShouldHave::BlockAndBlobs(peer_id)],
&mut cx,
);
let id = rig.expect_lookup_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
@ -395,11 +379,7 @@ fn test_single_block_lookup_becomes_parent_request() {
let peer_id = PeerId::random(); let peer_id = PeerId::random();
// Trigger the request // Trigger the request
bl.search_block( bl.search_block(block.canonical_root(), &[peer_id], &mut cx);
block.canonical_root(),
&[PeerShouldHave::BlockAndBlobs(peer_id)],
&mut cx,
);
let id = rig.expect_lookup_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
@ -981,11 +961,7 @@ fn test_single_block_lookup_ignored_response() {
let peer_id = PeerId::random(); let peer_id = PeerId::random();
// Trigger the request // Trigger the request
bl.search_block( bl.search_block(block.canonical_root(), &[peer_id], &mut cx);
block.canonical_root(),
&[PeerShouldHave::BlockAndBlobs(peer_id)],
&mut cx,
);
let id = rig.expect_lookup_request(response_type); let id = rig.expect_lookup_request(response_type);
// If we're in deneb, a blob request should have been triggered as well, // If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test. // we don't require a response because we're generateing 0-blob blocks in this test.
@ -1205,7 +1181,6 @@ mod deneb_only {
AttestationUnknownBlock, AttestationUnknownBlock,
GossipUnknownParentBlock, GossipUnknownParentBlock,
GossipUnknownParentBlob, GossipUnknownParentBlob,
GossipUnknownBlockOrBlob,
} }
impl DenebTester { impl DenebTester {
@ -1234,11 +1209,7 @@ mod deneb_only {
let (block_req_id, blob_req_id, parent_block_req_id, parent_blob_req_id) = let (block_req_id, blob_req_id, parent_block_req_id, parent_blob_req_id) =
match request_trigger { match request_trigger {
RequestTrigger::AttestationUnknownBlock => { RequestTrigger::AttestationUnknownBlock => {
bl.search_block( bl.search_block(block_root, &[peer_id], &mut cx);
block_root,
&[PeerShouldHave::BlockAndBlobs(peer_id)],
&mut cx,
);
let block_req_id = rig.expect_lookup_request(ResponseType::Block); let block_req_id = rig.expect_lookup_request(ResponseType::Block);
let blob_req_id = rig.expect_lookup_request(ResponseType::Blob); let blob_req_id = rig.expect_lookup_request(ResponseType::Blob);
(Some(block_req_id), Some(blob_req_id), None, None) (Some(block_req_id), Some(blob_req_id), None, None)
@ -1261,7 +1232,7 @@ mod deneb_only {
bl.search_child_block( bl.search_child_block(
child_root, child_root,
ChildComponents::new(child_root, Some(child_block), None), ChildComponents::new(child_root, Some(child_block), None),
&[PeerShouldHave::Neither(peer_id)], &[peer_id],
&mut cx, &mut cx,
); );
@ -1299,7 +1270,7 @@ mod deneb_only {
bl.search_child_block( bl.search_child_block(
child_root, child_root,
ChildComponents::new(child_root, None, Some(blobs)), ChildComponents::new(child_root, None, Some(blobs)),
&[PeerShouldHave::Neither(peer_id)], &[peer_id],
&mut cx, &mut cx,
); );
@ -1316,12 +1287,6 @@ mod deneb_only {
Some(parent_blob_req_id), Some(parent_blob_req_id),
) )
} }
RequestTrigger::GossipUnknownBlockOrBlob => {
bl.search_block(block_root, &[PeerShouldHave::Neither(peer_id)], &mut cx);
let block_req_id = rig.expect_lookup_request(ResponseType::Block);
let blob_req_id = rig.expect_lookup_request(ResponseType::Blob);
(Some(block_req_id), Some(blob_req_id), None, None)
}
}; };
Some(Self { Some(Self {
@ -1838,186 +1803,6 @@ mod deneb_only {
.block_response_triggering_process(); .block_response_triggering_process();
} }
#[test]
fn single_block_and_blob_lookup_block_returned_first_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.block_response_triggering_process()
.blobs_response()
.blobs_response_was_valid()
.block_imported();
}
#[test]
fn single_block_and_blob_lookup_blobs_returned_first_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.blobs_response()
.blobs_response_was_valid()
.block_response_triggering_process()
.block_imported();
}
#[test]
fn single_block_and_blob_lookup_empty_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.empty_block_response()
.expect_block_request()
.expect_no_penalty()
.expect_no_blobs_request()
.empty_blobs_response()
.expect_no_penalty()
.expect_no_block_request()
.expect_no_blobs_request()
.block_response_triggering_process()
.missing_components_from_block_request();
}
#[test]
fn single_block_response_then_empty_blob_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.block_response_triggering_process()
.missing_components_from_block_request()
.empty_blobs_response()
.missing_components_from_blob_request()
.expect_blobs_request()
.expect_no_penalty()
.expect_no_block_request();
}
#[test]
fn single_blob_response_then_empty_block_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.blobs_response()
.blobs_response_was_valid()
.expect_no_penalty()
.expect_no_block_request()
.expect_no_blobs_request()
.missing_components_from_blob_request()
.empty_block_response()
.expect_block_request()
.expect_no_penalty()
.expect_no_blobs_request();
}
#[test]
fn single_invalid_block_response_then_blob_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.block_response_triggering_process()
.invalid_block_processed()
.expect_penalty()
.expect_block_request()
.expect_no_blobs_request()
.blobs_response()
.missing_components_from_blob_request()
.expect_no_penalty()
.expect_no_block_request()
.expect_no_block_request();
}
#[test]
fn single_block_response_then_invalid_blob_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.block_response_triggering_process()
.missing_components_from_block_request()
.blobs_response()
.invalid_blob_processed()
.expect_penalty()
.expect_blobs_request()
.expect_no_block_request();
}
#[test]
fn single_block_response_then_too_few_blobs_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.block_response_triggering_process()
.missing_components_from_block_request()
.invalidate_blobs_too_few()
.blobs_response()
.missing_components_from_blob_request()
.expect_blobs_request()
.expect_no_penalty()
.expect_no_block_request();
}
#[test]
fn single_block_response_then_too_many_blobs_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.block_response_triggering_process()
.invalidate_blobs_too_many()
.blobs_response()
.expect_penalty()
.expect_blobs_request()
.expect_no_block_request();
}
#[test]
fn too_few_blobs_response_then_block_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.invalidate_blobs_too_few()
.blobs_response()
.blobs_response_was_valid()
.missing_components_from_blob_request()
.expect_no_penalty()
.expect_no_blobs_request()
.expect_no_block_request()
.block_response_triggering_process()
.missing_components_from_block_request()
.expect_blobs_request();
}
#[test]
fn too_many_blobs_response_then_block_response_gossip() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownBlockOrBlob) else {
return;
};
tester
.invalidate_blobs_too_many()
.blobs_response()
.expect_penalty()
.expect_blobs_request()
.expect_no_block_request()
.block_response_triggering_process();
}
#[test] #[test]
fn parent_block_unknown_parent() { fn parent_block_unknown_parent() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else { let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else {

View File

@ -34,7 +34,7 @@
//! search for the block and subsequently search for parents if needed. //! search for the block and subsequently search for parents if needed.
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::{BlockLookups, PeerShouldHave}; use super::block_lookups::BlockLookups;
use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
@ -139,13 +139,6 @@ pub enum SyncMessage<T: EthSpec> {
/// manager to attempt to find the block matching the unknown hash. /// manager to attempt to find the block matching the unknown hash.
UnknownBlockHashFromAttestation(PeerId, Hash256), UnknownBlockHashFromAttestation(PeerId, Hash256),
/// A peer has sent a blob that references a block that is unknown or a peer has sent a block for
/// which we haven't received blobs.
///
/// We will either attempt to find the block matching the unknown hash immediately or queue a lookup,
/// which will then trigger the request when we receive `MissingGossipBlockComponentsDelayed`.
MissingGossipBlockComponents(Vec<PeerId>, Hash256),
/// A peer has disconnected. /// A peer has disconnected.
Disconnect(PeerId), Disconnect(PeerId),
@ -658,31 +651,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
// If we are not synced, ignore this block. // If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) { if self.synced_and_connected(&peer_id) {
self.block_lookups.search_block(
block_hash,
&[PeerShouldHave::BlockAndBlobs(peer_id)],
&mut self.network,
);
}
}
SyncMessage::MissingGossipBlockComponents(peer_id, block_root) => {
let peers_guard = self.network_globals().peers.read();
let connected_peers = peer_id
.into_iter()
.filter_map(|peer_id| {
if peers_guard.is_connected(&peer_id) {
Some(PeerShouldHave::Neither(peer_id))
} else {
None
}
})
.collect::<Vec<_>>();
drop(peers_guard);
// If we are not synced, ignore this block.
if self.synced() && !connected_peers.is_empty() {
self.block_lookups self.block_lookups
.search_block(block_root, &connected_peers, &mut self.network) .search_block(block_hash, &[peer_id], &mut self.network);
} }
} }
SyncMessage::Disconnect(peer_id) => { SyncMessage::Disconnect(peer_id) => {
@ -766,7 +736,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.block_lookups.search_child_block( self.block_lookups.search_child_block(
block_root, block_root,
child_components, child_components,
&[PeerShouldHave::Neither(peer_id)], &[peer_id],
&mut self.network, &mut self.network,
); );
} }