From ceaa740841acc990cf4f84fb5fd927e1ff4613ad Mon Sep 17 00:00:00 2001 From: ethDreamer <37123614+ethDreamer@users.noreply.github.com> Date: Mon, 5 Jun 2023 08:09:42 -0500 Subject: [PATCH] Validate & Store Blobs During Backfill (#4307) * Verify and Store Blobs During Backfill * Improve logs * Eliminated Clone * Fix Inital Vector Capacity * Addressed Sean's Comments --- .../beacon_chain/src/blob_verification.rs | 9 ++ .../src/data_availability_checker.rs | 7 ++ .../beacon_chain/src/historical_blocks.rs | 55 ++++++++---- beacon_node/beacon_chain/tests/store_tests.rs | 29 +++++-- beacon_node/http_api/src/database.rs | 17 +--- beacon_node/http_api/src/lib.rs | 28 +------ .../beacon_processor/worker/sync_methods.rs | 84 +++++++++++++++---- 7 files changed, 147 insertions(+), 82 deletions(-) diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 2216764c2..6f4868f14 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -651,6 +651,15 @@ impl AsBlock for BlockWrapper { } } +impl BlockWrapper { + pub fn n_blobs(&self) -> usize { + match self { + BlockWrapper::Block(_) => 0, + BlockWrapper::BlockAndBlobs(_, blobs) => blobs.len(), + } + } +} + impl From>> for BlockWrapper { fn from(value: Arc>) -> Self { Self::Block(value) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 550515009..e32c78091 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -493,6 +493,13 @@ impl AvailableBlock { VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)), } } + + pub fn blobs(&self) -> Option<&BlobSidecarList> { + match &self.blobs { + VerifiedBlobs::Available(blobs) => Some(blobs), + _ => None, + } + } } impl AsBlock for AvailableBlock { diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 657415ac8..736a2b4f6 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -1,3 +1,4 @@ +use crate::data_availability_checker::AvailableBlock; use crate::{errors::BeaconChainError as Error, metrics, BeaconChain, BeaconChainTypes}; use itertools::Itertools; use slog::debug; @@ -7,10 +8,9 @@ use state_processing::{ }; use std::borrow::Cow; use std::iter; -use std::sync::Arc; use std::time::Duration; use store::{chunked_vector::BlockRoots, AnchorInfo, ChunkWriter, KeyValueStore}; -use types::{Hash256, SignedBlindedBeaconBlock, Slot}; +use types::{Hash256, Slot}; /// Use a longer timeout on the pubkey cache. /// @@ -59,7 +59,7 @@ impl BeaconChain { /// Return the number of blocks successfully imported. pub fn import_historical_block_batch( &self, - blocks: Vec>>, + mut blocks: Vec>, ) -> Result { let anchor_info = self .store @@ -67,19 +67,21 @@ impl BeaconChain { .ok_or(HistoricalBlockError::NoAnchorInfo)?; // Take all blocks with slots less than the oldest block slot. - let num_relevant = - blocks.partition_point(|block| block.slot() < anchor_info.oldest_block_slot); - let blocks_to_import = &blocks - .get(..num_relevant) - .ok_or(HistoricalBlockError::IndexOutOfBounds)?; + let num_relevant = blocks.partition_point(|available_block| { + available_block.block().slot() < anchor_info.oldest_block_slot + }); - if blocks_to_import.len() != blocks.len() { + let total_blocks = blocks.len(); + blocks.truncate(num_relevant); + let blocks_to_import = blocks; + + if blocks_to_import.len() != total_blocks { debug!( self.log, "Ignoring some historic blocks"; "oldest_block_slot" => anchor_info.oldest_block_slot, - "total_blocks" => blocks.len(), - "ignored" => blocks.len().saturating_sub(blocks_to_import.len()), + "total_blocks" => total_blocks, + "ignored" => total_blocks.saturating_sub(blocks_to_import.len()), ); } @@ -87,15 +89,23 @@ impl BeaconChain { return Ok(0); } + let n_blobs_to_import = blocks_to_import + .iter() + .map(|available_block| available_block.blobs().map_or(0, |blobs| blobs.len())) + .sum::(); + let mut expected_block_root = anchor_info.oldest_block_parent; let mut prev_block_slot = anchor_info.oldest_block_slot; let mut chunk_writer = ChunkWriter::::new(&self.store.cold_db, prev_block_slot.as_usize())?; - let mut cold_batch = Vec::with_capacity(blocks.len()); - let mut hot_batch = Vec::with_capacity(blocks.len()); + let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); + let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_to_import); + let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); + + for available_block in blocks_to_import.into_iter().rev() { + let (block, maybe_blobs) = available_block.deconstruct(); - for block in blocks_to_import.iter().rev() { // Check chain integrity. let block_root = block.canonical_root(); @@ -107,9 +117,15 @@ impl BeaconChain { .into()); } + let blinded_block = block.clone_as_blinded(); // Store block in the hot database without payload. self.store - .blinded_block_as_kv_store_ops(&block_root, block, &mut hot_batch); + .blinded_block_as_kv_store_ops(&block_root, &blinded_block, &mut hot_batch); + // Store the blobs too + if let Some(blobs) = maybe_blobs { + self.store + .blobs_as_kv_store_ops(&block_root, blobs, &mut hot_batch); + } // Store block roots, including at all skip slots in the freezer DB. for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { @@ -132,8 +148,11 @@ impl BeaconChain { expected_block_root = Hash256::zero(); break; } + signed_blocks.push(block); } chunk_writer.write(&mut cold_batch)?; + // these were pushed in reverse order so we reverse again + signed_blocks.reverse(); // Verify signatures in one batch, holding the pubkey cache lock for the shortest duration // possible. For each block fetch the parent root from its successor. Slicing from index 1 @@ -144,13 +163,13 @@ impl BeaconChain { .validator_pubkey_cache .try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT) .ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?; - let block_roots = blocks_to_import + let block_roots = signed_blocks .get(1..) .ok_or(HistoricalBlockError::IndexOutOfBounds)? .iter() .map(|block| block.parent_root()) .chain(iter::once(anchor_info.oldest_block_parent)); - let signature_set = blocks_to_import + let signature_set = signed_blocks .iter() .zip_eq(block_roots) .filter_map(|(block, block_root)| { @@ -207,6 +226,6 @@ impl BeaconChain { self.store_migrator.process_reconstruction(); } - Ok(blocks_to_import.len()) + Ok(num_relevant) } } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 239ae1806..b1d583fb4 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -8,9 +8,9 @@ use beacon_chain::test_utils::{ }; use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use beacon_chain::{ - historical_blocks::HistoricalBlockError, migrate::MigratorConfig, BeaconChain, - BeaconChainError, BeaconChainTypes, BeaconSnapshot, ChainConfig, NotifyExecutionLayer, - ServerSentEventHandler, WhenSlotSkipped, + blob_verification::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, + migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, + ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, }; use eth2_network_config::TRUSTED_SETUP; use fork_choice::CountUnrealized; @@ -2209,14 +2209,33 @@ async fn weak_subjectivity_sync() { .filter(|s| s.beacon_block.slot() != 0) .map(|s| s.beacon_block.clone()) .collect::>(); + + let mut available_blocks = vec![]; + for blinded in historical_blocks { + let full_block = harness + .chain + .get_block(&blinded.canonical_root()) + .await + .expect("should get block") + .expect("should get block"); + if let MaybeAvailableBlock::Available(block) = harness + .chain + .data_availability_checker + .check_availability(full_block.into()) + .expect("should check availability") + { + available_blocks.push(block); + } + } + beacon_chain - .import_historical_block_batch(historical_blocks.clone()) + .import_historical_block_batch(available_blocks.clone()) .unwrap(); assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0); // Resupplying the blocks should not fail, they can be safely ignored. beacon_chain - .import_historical_block_batch(historical_blocks) + .import_historical_block_batch(available_blocks) .unwrap(); // The forwards iterator should now match the original chain diff --git a/beacon_node/http_api/src/database.rs b/beacon_node/http_api/src/database.rs index 645c19c40..37bf7958a 100644 --- a/beacon_node/http_api/src/database.rs +++ b/beacon_node/http_api/src/database.rs @@ -1,8 +1,7 @@ -use beacon_chain::store::{metadata::CURRENT_SCHEMA_VERSION, AnchorInfo}; +use beacon_chain::store::metadata::CURRENT_SCHEMA_VERSION; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2::lighthouse::DatabaseInfo; use std::sync::Arc; -use types::SignedBlindedBeaconBlock; pub fn info( chain: Arc>, @@ -19,17 +18,3 @@ pub fn info( anchor, }) } - -pub fn historical_blocks( - chain: Arc>, - blocks: Vec>>, -) -> Result { - chain - .import_historical_block_batch(blocks) - .map_err(warp_utils::reject::beacon_chain_error)?; - - let anchor = chain.store.get_anchor_info().ok_or_else(|| { - warp_utils::reject::custom_bad_request("node is not checkpoint synced".to_string()) - })?; - Ok(anchor) -} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a29e41f41..3edc6aa30 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -61,9 +61,9 @@ use types::{ Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, - SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, - SyncCommitteeMessage, SyncContributionData, + SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, + SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, + SyncContributionData, }; use version::{ add_consensus_version_header, execution_optimistic_finalized_fork_versioned_response, @@ -3614,27 +3614,6 @@ pub fn serve( }) }); - // POST lighthouse/database/historical_blocks - let post_lighthouse_database_historical_blocks = database_path - .and(warp::path("historical_blocks")) - .and(warp::path::end()) - .and(warp::body::json()) - .and(chain_filter.clone()) - .and(log_filter.clone()) - .and_then( - |blocks: Vec>>, - chain: Arc>, - log: Logger| { - info!( - log, - "Importing historical blocks"; - "count" => blocks.len(), - "source" => "http_api" - ); - blocking_json_task(move || database::historical_blocks(chain, blocks)) - }, - ); - // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) @@ -3905,7 +3884,6 @@ pub fn serve( .uor(post_validator_register_validator) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) - .uor(post_lighthouse_database_historical_blocks) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index e8dcf747b..5a33650b6 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -7,8 +7,9 @@ use crate::beacon_processor::DuplicateCache; use crate::metrics; use crate::sync::manager::{BlockProcessType, SyncMessage}; use crate::sync::{BatchProcessResult, ChainId}; -use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; +use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::{ observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, @@ -18,10 +19,9 @@ use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized}; use lighthouse_network::PeerAction; use slog::{debug, error, info, warn}; use slot_clock::SlotClock; -use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; -use types::{Epoch, Hash256, SignedBeaconBlock}; +use types::{Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -273,20 +273,19 @@ impl Worker { let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); + let n_blobs = downloaded_blocks + .iter() + .map(|wrapped| wrapped.n_blobs()) + .sum::(); - let unwrapped = downloaded_blocks - .into_iter() - //FIXME(sean) handle blobs in backfill - .map(|block| block.block_cloned()) - .collect(); - - match self.process_backfill_blocks(unwrapped) { + match self.process_backfill_blocks(downloaded_blocks) { (_, Ok(_)) => { debug!(self.log, "Backfill batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, + "processed_blobs" => n_blobs, "service"=> "sync"); BatchProcessResult::Success { was_non_empty: sent_blocks > 0, @@ -297,6 +296,7 @@ impl Worker { "batch_epoch" => epoch, "first_block_slot" => start_slot, "last_block_slot" => end_slot, + "processed_blobs" => n_blobs, "error" => %e.message, "service" => "sync"); match e.peer_action { @@ -386,19 +386,67 @@ impl Worker { /// Helper function to process backfill block batches which only consumes the chain and blocks to process. fn process_backfill_blocks( &self, - blocks: Vec>>, + downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { - let blinded_blocks = blocks - .iter() - .map(|full_block| full_block.clone_as_blinded()) - .map(Arc::new) - .collect(); - match self.chain.import_historical_block_batch(blinded_blocks) { + let total_blocks = downloaded_blocks.len(); + let available_blocks = match downloaded_blocks + .into_iter() + .map(|block| { + self.chain + .data_availability_checker + .check_availability(block) + }) + .collect::, _>>() + { + Ok(blocks) => blocks + .into_iter() + .filter_map(|maybe_available| match maybe_available { + MaybeAvailableBlock::Available(block) => Some(block), + MaybeAvailableBlock::AvailabilityPending(_) => None, + }) + .collect::>(), + Err(e) => match e { + AvailabilityCheckError::StoreError(_) + | AvailabilityCheckError::KzgNotInitialized => { + return ( + 0, + Err(ChainSegmentFailed { + peer_action: None, + message: "Failed to check block availability".into(), + }), + ); + } + e => { + return ( + 0, + Err(ChainSegmentFailed { + peer_action: Some(PeerAction::LowToleranceError), + message: format!("Failed to check block availability : {:?}", e), + }), + ) + } + }, + }; + + if available_blocks.len() != total_blocks { + return ( + 0, + Err(ChainSegmentFailed { + peer_action: Some(PeerAction::LowToleranceError), + message: format!( + "{} out of {} blocks were unavailable", + (total_blocks - available_blocks.len()), + total_blocks + ), + }), + ); + } + + match self.chain.import_historical_block_batch(available_blocks) { Ok(imported_blocks) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL, ); - (imported_blocks, Ok(())) } Err(error) => {