From 02c7a2eaf51d9cd9757a4d777d7a2c7816201bf4 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 8 Aug 2023 18:45:11 -0400 Subject: [PATCH] Improve single block/blob logging (#4579) * remove closure from `check_availability_mayb_import` * impove logging, add wrapper struct to requested ids * improve logging * only log if we're in deneb. Only delay lookup if we're in deneb * fix bug in missing components check --- beacon_node/beacon_chain/src/beacon_chain.rs | 66 +++++++++---- .../src/data_availability_checker.rs | 10 ++ .../gossip_methods.rs | 2 +- .../network_beacon_processor/sync_methods.rs | 6 +- .../network/src/sync/block_lookups/common.rs | 4 +- .../network/src/sync/block_lookups/mod.rs | 93 ++++++++++++++----- .../src/sync/block_lookups/parent_lookup.rs | 6 +- .../sync/block_lookups/single_block_lookup.rs | 49 +++++++++- .../network/src/sync/block_lookups/tests.rs | 8 +- beacon_node/network/src/sync/manager.rs | 66 ++++++++----- .../network/src/sync/network_context.rs | 16 +++- 11 files changed, 240 insertions(+), 86 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 32c7cc168..3489fb7ab 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -33,7 +33,6 @@ use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, Prep use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; -use crate::kzg_utils; use crate::light_client_finality_update_verification::{ Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate, }; @@ -68,6 +67,7 @@ use crate::validator_monitor::{ HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, }; use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use crate::{kzg_utils, AvailabilityPendingExecutedBlock}; use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead}; use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty}; use execution_layer::{ @@ -118,7 +118,7 @@ use task_executor::{ShutdownReason, TaskExecutor}; use tokio_stream::Stream; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; -use types::blob_sidecar::BlobSidecarList; +use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList}; use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS; use types::*; @@ -2786,10 +2786,7 @@ impl BeaconChain { self: &Arc, blob: GossipVerifiedBlob, ) -> Result> { - self.check_availability_and_maybe_import(blob.slot(), |chain| { - chain.data_availability_checker.put_gossip_blob(blob) - }) - .await + self.check_gossip_blob_availability_and_import(blob).await } /// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and @@ -2840,12 +2837,7 @@ impl BeaconChain { match executed_block { ExecutedBlock::Available(block) => self.import_available_block(Box::new(block)).await, ExecutedBlock::AvailabilityPending(block) => { - self.check_availability_and_maybe_import(block.block.slot(), |chain| { - chain - .data_availability_checker - .put_pending_executed_block(block) - }) - .await + self.check_block_availability_and_import(block).await } } } @@ -2934,17 +2926,57 @@ impl BeaconChain { } } - /// Accepts a fully-verified, available block and imports it into the chain without performing any - /// additional verification. + /* Import methods */ + + /// Checks if the block is available, and imports immediately if so, otherwise caches the block + /// in the data availability checker. + pub async fn check_block_availability_and_import( + self: &Arc, + block: AvailabilityPendingExecutedBlock, + ) -> Result> { + let slot = block.block.slot(); + let availability = self + .data_availability_checker + .put_pending_executed_block(block)?; + self.process_availability(slot, availability).await + } + + /// Checks if the provided blob can make any cached blocks available, and imports immediately + /// if so, otherwise caches the blob in the data availability checker. + pub async fn check_gossip_blob_availability_and_import( + self: &Arc, + blob: GossipVerifiedBlob, + ) -> Result> { + let slot = blob.slot(); + let availability = self.data_availability_checker.put_gossip_blob(blob)?; + + self.process_availability(slot, availability).await + } + + /// Checks if the provided blobs can make any cached blocks available, and imports immediately + /// if so, otherwise caches the blob in the data availability checker. + pub async fn check_rpc_blob_availability_and_import( + self: &Arc, + slot: Slot, + block_root: Hash256, + blobs: FixedBlobSidecarList, + ) -> Result> { + let availability = self + .data_availability_checker + .put_rpc_blobs(block_root, blobs)?; + + self.process_availability(slot, availability).await + } + + /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` /// /// An error is returned if the block was unable to be imported. It may be partially imported /// (i.e., this function is not atomic). - pub async fn check_availability_and_maybe_import( + async fn process_availability( self: &Arc, slot: Slot, - cache_fn: impl FnOnce(Arc) -> Result, AvailabilityCheckError>, + availability: Availability, ) -> Result> { - let availability = cache_fn(self.clone())?; match availability { Availability::Available(block) => { // This is the time since start of the slot where all the components of the block have become available diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index f6130d26e..353b26e03 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -313,6 +313,16 @@ impl DataAvailabilityChecker { .map_or(false, |da_epoch| block_epoch >= da_epoch) } + /// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch. + pub fn is_deneb(&self) -> bool { + self.slot_clock.now().map_or(false, |slot| { + self.spec.deneb_fork_epoch.map_or(false, |deneb_epoch| { + let now_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + now_epoch >= deneb_epoch + }) + }) + } + /// Persist all in memory components to disk pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> { self.availability_cache.write_all_to_disk() diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 94096af88..4b2a11a56 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -721,7 +721,7 @@ impl NetworkBeaconProcessor { self.chain.recompute_head_at_current_slot().await; } Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_hash)) => { - debug!( + trace!( self.log, "Missing block components for gossip verified blob"; "slot" => %blob_slot, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index b9d9a78f8..9489d5fab 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -287,11 +287,7 @@ impl NetworkBeaconProcessor { let result = self .chain - .check_availability_and_maybe_import(slot, |chain| { - chain - .data_availability_checker - .put_rpc_blobs(block_root, blobs) - }) + .check_rpc_blob_availability_and_import(slot, block_root, blobs) .await; // Sync handles these results diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 4f071a043..7e58006de 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -374,7 +374,7 @@ impl RequestState for BlobRequestState BlobsByRootRequest { BlobsByRootRequest { - blob_ids: VariableList::from(self.requested_ids.clone()), + blob_ids: self.requested_ids.clone().into(), } } @@ -402,7 +402,7 @@ impl RequestState for BlobRequestState= T::EthSpec::max_blobs_per_block() as u64 { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 53670e118..ee1a6a677 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -96,7 +96,7 @@ pub struct BlockLookups { single_block_lookups: FnvHashMap>, - da_checker: Arc>, + pub(crate) da_checker: Arc>, /// The logger for the import manager. log: Logger, @@ -126,10 +126,14 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { let lookup = self.new_current_lookup(block_root, None, &[peer_source], cx); + if let Some(lookup) = lookup { + let msg = "Searching for block"; + lookup_creation_logging(msg, &lookup, peer_source, &self.log); self.trigger_single_lookup(lookup, cx); } } + /// Creates a lookup for the block with the given `block_root`. /// /// The request is not immediately triggered, and should be triggered by a call to @@ -142,6 +146,8 @@ impl BlockLookups { ) { let lookup = self.new_current_lookup(block_root, None, &[peer_source], cx); if let Some(lookup) = lookup { + let msg = "Initialized delayed lookup for block"; + lookup_creation_logging(msg, &lookup, peer_source, &self.log); self.add_single_lookup(lookup) } } @@ -155,13 +161,18 @@ impl BlockLookups { pub fn search_child_block( &mut self, block_root: Hash256, - child_components: Option>, - peer_source: &[PeerShouldHave], + child_components: CachedChildComponents, + peer_source: PeerShouldHave, cx: &mut SyncNetworkContext, ) { - let lookup = self.new_current_lookup(block_root, child_components, peer_source, cx); - if let Some(lookup) = lookup { - self.trigger_single_lookup(lookup, cx); + if child_components.is_missing_components() { + let lookup = + self.new_current_lookup(block_root, Some(child_components), &[peer_source], cx); + if let Some(lookup) = lookup { + let msg = "Searching for components of a block with unknown parent"; + lookup_creation_logging(msg, &lookup, peer_source, &self.log); + self.trigger_single_lookup(lookup, cx); + } } } @@ -175,13 +186,18 @@ impl BlockLookups { pub fn search_child_delayed( &mut self, block_root: Hash256, - child_components: Option>, - peer_source: &[PeerShouldHave], + child_components: CachedChildComponents, + peer_source: PeerShouldHave, cx: &mut SyncNetworkContext, ) { - let lookup = self.new_current_lookup(block_root, child_components, peer_source, cx); - if let Some(lookup) = lookup { - self.add_single_lookup(lookup) + if child_components.is_missing_components() { + let lookup = + self.new_current_lookup(block_root, Some(child_components), &[peer_source], cx); + if let Some(lookup) = lookup { + let msg = "Initialized delayed lookup for block with unknown parent"; + lookup_creation_logging(msg, &lookup, peer_source, &self.log); + self.add_single_lookup(lookup) + } } } @@ -218,6 +234,22 @@ impl BlockLookups { pub fn trigger_lookup_by_root(&mut self, block_root: Hash256, cx: &SyncNetworkContext) { self.single_block_lookups.retain(|_id, lookup| { if lookup.block_root() == block_root { + if lookup.da_checker.is_deneb() { + let blob_indices = lookup.blob_request_state.requested_ids.indices(); + debug!( + self.log, + "Triggering delayed single lookup"; + "block" => ?block_root, + "blob_indices" => ?blob_indices + ); + } else { + debug!( + self.log, + "Triggering delayed single lookup"; + "block" => ?block_root, + ); + } + if let Err(e) = lookup.request_block_and_blobs(cx) { debug!(self.log, "Delayed single block lookup failed"; "error" => ?e, @@ -271,13 +303,6 @@ impl BlockLookups { return None; } - debug!( - self.log, - "Searching for block"; - "peer_id" => ?peers, - "block" => ?block_root - ); - Some(SingleBlockLookup::new( block_root, child_components, @@ -583,7 +608,6 @@ impl BlockLookups { &mut parent_lookup, ) { Ok(()) => { - debug!(self.log, "Requesting parent"; &parent_lookup); self.parent_lookups.push(parent_lookup); } Err(e) => { @@ -1435,10 +1459,7 @@ impl BlockLookups { Err(e) => { self.handle_parent_request_error(&mut parent_lookup, cx, e); } - Ok(_) => { - debug!(self.log, "Requesting parent"; &parent_lookup); - self.parent_lookups.push(parent_lookup) - } + Ok(_) => self.parent_lookups.push(parent_lookup), } // We remove and add back again requests so we want this updated regardless of outcome. @@ -1460,3 +1481,29 @@ impl BlockLookups { self.parent_lookups.drain(..).len() } } + +fn lookup_creation_logging( + msg: &str, + lookup: &SingleBlockLookup, + peer_source: PeerShouldHave, + log: &Logger, +) { + let block_root = lookup.block_root(); + if lookup.da_checker.is_deneb() { + let blob_indices = lookup.blob_request_state.requested_ids.indices(); + debug!( + log, + "{}", msg; + "peer_id" => ?peer_source, + "block" => ?block_root, + "blob_indices" => ?blob_indices + ); + } else { + debug!( + log, + "{}", msg; + "peer_id" => ?peer_source, + "block" => ?block_root, + ); + } +} diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 56c509c16..93f2615c0 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -193,11 +193,11 @@ impl ParentLookup { ) -> Result, ParentVerifyError> { let expected_block_root = self.current_parent_request.block_root(); let request_state = R::request_state_mut(&mut self.current_parent_request); - let root_and_block = request_state.verify_response(expected_block_root, block)?; + let root_and_verified = request_state.verify_response(expected_block_root, block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should // be dropped and the peer downscored. - if let Some(parent_root) = root_and_block + if let Some(parent_root) = root_and_verified .as_ref() .and_then(|block| R::get_parent_root(block)) { @@ -207,7 +207,7 @@ impl ParentLookup { } } - Ok(root_and_block) + Ok(root_and_verified) } pub fn add_peers(&mut self, peer_source: &[PeerShouldHave]) { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 16badf613..897c0cad0 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -5,9 +5,12 @@ use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::BeaconChainTypes; +use lighthouse_network::rpc::methods::MaxRequestBlobSidecars; use lighthouse_network::{PeerAction, PeerId}; use slog::{trace, Logger}; +use ssz_types::VariableList; use std::collections::HashSet; +use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; use store::Hash256; @@ -257,7 +260,7 @@ impl SingleBlockLookup { /// Updates this request with the most recent picture of which blobs still need to be requested. pub fn update_blobs_request(&mut self) { - self.blob_request_state.requested_ids = self.missing_blob_ids() + self.blob_request_state.requested_ids = self.missing_blob_ids().into() } /// If `unknown_parent_components` is `Some`, we know block components won't hit the data @@ -319,12 +322,42 @@ impl SingleBlockLookup { } } +#[derive(Clone, Default)] +pub struct RequestedBlobIds(Vec); + +impl From> for RequestedBlobIds { + fn from(value: Vec) -> Self { + Self(value) + } +} + +impl Into> for RequestedBlobIds { + fn into(self) -> VariableList { + VariableList::from(self.0) + } +} + +impl RequestedBlobIds { + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + pub fn contains(&self, blob_id: &BlobIdentifier) -> bool { + self.0.contains(blob_id) + } + pub fn remove(&mut self, blob_id: &BlobIdentifier) { + self.0.retain(|id| id != blob_id) + } + pub fn indices(&self) -> Vec { + self.0.iter().map(|id| id.index).collect() + } +} + /// The state of the blob request component of a `SingleBlockLookup`. pub struct BlobRequestState { /// The latest picture of which blobs still need to be requested. This includes information /// from both block/blobs downloaded in the network layer and any blocks/blobs that exist in /// the data availability checker. - pub requested_ids: Vec, + pub requested_ids: RequestedBlobIds, /// Where we store blobs until we receive the stream terminator. pub blob_download_queue: FixedBlobSidecarList, pub state: SingleLookupRequestState, @@ -430,6 +463,16 @@ impl CachedChildComponents { .filter_map(|(i, blob_opt)| blob_opt.as_ref().map(|_| i)) .collect::>() } + + pub fn is_missing_components(&self) -> bool { + self.downloaded_block + .as_ref() + .map(|block| { + block.num_expected_blobs() + != self.downloaded_blobs.iter().filter(|b| b.is_some()).count() + }) + .unwrap_or(true) + } } /// Object representing the state of a single block or blob lookup request. @@ -562,7 +605,7 @@ impl slog::Value for SingleBlockLookup { serializer.emit_arguments("hash", &format_args!("{}", self.block_root()))?; serializer.emit_arguments( "blob_ids", - &format_args!("{:?}", self.blob_request_state.requested_ids), + &format_args!("{:?}", self.blob_request_state.requested_ids.indices()), )?; serializer.emit_arguments( "block_request_state.state", diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index ecc1cdc8e..b0217e5b7 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1306,8 +1306,8 @@ mod deneb_only { block_root = child_root; bl.search_child_block( child_root, - Some(CachedChildComponents::new(Some(child_block), None)), - &[PeerShouldHave::Neither(peer_id)], + CachedChildComponents::new(Some(child_block), None), + PeerShouldHave::Neither(peer_id), &mut cx, ); @@ -1344,8 +1344,8 @@ mod deneb_only { *blobs.index_mut(0) = Some(child_blob); bl.search_child_block( child_root, - Some(CachedChildComponents::new(None, Some(blobs))), - &[PeerShouldHave::Neither(peer_id)], + CachedChildComponents::new(None, Some(blobs)), + PeerShouldHave::Neither(peer_id), &mut cx, ); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 16855ab88..d8286c29f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -654,7 +654,7 @@ impl SyncManager { block_root, parent_root, block_slot, - Some(block.into()), + block.into(), ); } SyncMessage::UnknownParentBlob(peer_id, blob) => { @@ -673,7 +673,7 @@ impl SyncManager { block_root, parent_root, blob_slot, - Some(CachedChildComponents::new(None, Some(blobs))), + CachedChildComponents::new(None, Some(blobs)), ); } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { @@ -782,7 +782,7 @@ impl SyncManager { block_root: Hash256, parent_root: Hash256, slot: Slot, - child_components: Option>, + child_components: CachedChildComponents, ) { if self.should_search_for_block(slot, &peer_id) { self.block_lookups.search_parent( @@ -796,7 +796,7 @@ impl SyncManager { self.block_lookups.search_child_delayed( block_root, child_components, - &[PeerShouldHave::Neither(peer_id)], + PeerShouldHave::Neither(peer_id), &mut self.network, ); if let Err(e) = self @@ -809,7 +809,7 @@ impl SyncManager { self.block_lookups.search_child_block( block_root, child_components, - &[PeerShouldHave::Neither(peer_id)], + PeerShouldHave::Neither(peer_id), &mut self.network, ); } @@ -817,6 +817,10 @@ impl SyncManager { } fn should_delay_lookup(&mut self, slot: Slot) -> bool { + if !self.block_lookups.da_checker.is_deneb() { + return false; + } + let maximum_gossip_clock_disparity = self.chain.spec.maximum_gossip_clock_disparity(); let earliest_slot = self .chain @@ -1013,28 +1017,44 @@ impl SyncManager { RequestId::SingleBlock { .. } => { crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); } - RequestId::SingleBlob { id } => self - .block_lookups - .single_lookup_response::>( - id, - peer_id, - blob, - seen_timestamp, - &self.network, - ), + RequestId::SingleBlob { id } => { + if let Some(blob) = blob.as_ref() { + debug!(self.log, + "Peer returned blob for single lookup"; + "peer_id" => %peer_id , + "blob_id" =>?blob.id() + ); + } + self.block_lookups + .single_lookup_response::>( + id, + peer_id, + blob, + seen_timestamp, + &self.network, + ) + } RequestId::ParentLookup { id: _ } => { crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id ); } - RequestId::ParentLookupBlob { id } => self - .block_lookups - .parent_lookup_response::>( - id, - peer_id, - blob, - seen_timestamp, - &self.network, - ), + RequestId::ParentLookupBlob { id } => { + if let Some(blob) = blob.as_ref() { + debug!(self.log, + "Peer returned blob for parent lookup"; + "peer_id" => %peer_id , + "blob_id" =>?blob.id() + ); + } + self.block_lookups + .parent_lookup_response::>( + id, + peer_id, + blob, + seen_timestamp, + &self.network, + ) + } RequestId::BackFillBlocks { id: _ } => { crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id ); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index df48005e4..4b75f5681 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -418,11 +418,11 @@ impl SyncNetworkContext { }; let request_id = RequestId::Sync(sync_id); - trace!( + debug!( self.log, "Sending BlocksByRoot Request"; "method" => "BlocksByRoot", - "count" => request.block_roots().len(), + "block_roots" => ?request.block_roots().to_vec(), "peer" => %peer_id, "lookup_type" => ?lookup_type ); @@ -448,12 +448,18 @@ impl SyncNetworkContext { }; let request_id = RequestId::Sync(sync_id); - if !blob_request.blob_ids.is_empty() { - trace!( + if let Some(block_root) = blob_request.blob_ids.first().map(|id| id.block_root) { + let indices = blob_request + .blob_ids + .iter() + .map(|id| id.index) + .collect::>(); + debug!( self.log, "Sending BlobsByRoot Request"; "method" => "BlobsByRoot", - "count" => blob_request.blob_ids.len(), + "block_root" => ?block_root, + "blob_indices" => ?indices, "peer" => %blob_peer_id, "lookup_type" => ?lookup_type );