Validate & Store Blobs During Backfill (#4307)

* Verify and Store Blobs During Backfill

* Improve logs

* Eliminated Clone

* Fix Inital Vector Capacity

* Addressed Sean's Comments
This commit is contained in:
ethDreamer 2023-06-05 08:09:42 -05:00 committed by GitHub
parent 7a4be59884
commit ceaa740841
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 147 additions and 82 deletions

View File

@ -651,6 +651,15 @@ impl<E: EthSpec> AsBlock<E> for BlockWrapper<E> {
} }
} }
impl<E: EthSpec> BlockWrapper<E> {
pub fn n_blobs(&self) -> usize {
match self {
BlockWrapper::Block(_) => 0,
BlockWrapper::BlockAndBlobs(_, blobs) => blobs.len(),
}
}
}
impl<E: EthSpec> From<Arc<SignedBeaconBlock<E>>> for BlockWrapper<E> { impl<E: EthSpec> From<Arc<SignedBeaconBlock<E>>> for BlockWrapper<E> {
fn from(value: Arc<SignedBeaconBlock<E>>) -> Self { fn from(value: Arc<SignedBeaconBlock<E>>) -> Self {
Self::Block(value) Self::Block(value)

View File

@ -493,6 +493,13 @@ impl<E: EthSpec> AvailableBlock<E> {
VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)), VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)),
} }
} }
pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
match &self.blobs {
VerifiedBlobs::Available(blobs) => Some(blobs),
_ => None,
}
}
} }
impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> { impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {

View File

@ -1,3 +1,4 @@
use crate::data_availability_checker::AvailableBlock;
use crate::{errors::BeaconChainError as Error, metrics, BeaconChain, BeaconChainTypes}; use crate::{errors::BeaconChainError as Error, metrics, BeaconChain, BeaconChainTypes};
use itertools::Itertools; use itertools::Itertools;
use slog::debug; use slog::debug;
@ -7,10 +8,9 @@ use state_processing::{
}; };
use std::borrow::Cow; use std::borrow::Cow;
use std::iter; use std::iter;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use store::{chunked_vector::BlockRoots, AnchorInfo, ChunkWriter, KeyValueStore}; use store::{chunked_vector::BlockRoots, AnchorInfo, ChunkWriter, KeyValueStore};
use types::{Hash256, SignedBlindedBeaconBlock, Slot}; use types::{Hash256, Slot};
/// Use a longer timeout on the pubkey cache. /// Use a longer timeout on the pubkey cache.
/// ///
@ -59,7 +59,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Return the number of blocks successfully imported. /// Return the number of blocks successfully imported.
pub fn import_historical_block_batch( pub fn import_historical_block_batch(
&self, &self,
blocks: Vec<Arc<SignedBlindedBeaconBlock<T::EthSpec>>>, mut blocks: Vec<AvailableBlock<T::EthSpec>>,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
let anchor_info = self let anchor_info = self
.store .store
@ -67,19 +67,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.ok_or(HistoricalBlockError::NoAnchorInfo)?; .ok_or(HistoricalBlockError::NoAnchorInfo)?;
// Take all blocks with slots less than the oldest block slot. // Take all blocks with slots less than the oldest block slot.
let num_relevant = let num_relevant = blocks.partition_point(|available_block| {
blocks.partition_point(|block| block.slot() < anchor_info.oldest_block_slot); available_block.block().slot() < anchor_info.oldest_block_slot
let blocks_to_import = &blocks });
.get(..num_relevant)
.ok_or(HistoricalBlockError::IndexOutOfBounds)?;
if blocks_to_import.len() != blocks.len() { let total_blocks = blocks.len();
blocks.truncate(num_relevant);
let blocks_to_import = blocks;
if blocks_to_import.len() != total_blocks {
debug!( debug!(
self.log, self.log,
"Ignoring some historic blocks"; "Ignoring some historic blocks";
"oldest_block_slot" => anchor_info.oldest_block_slot, "oldest_block_slot" => anchor_info.oldest_block_slot,
"total_blocks" => blocks.len(), "total_blocks" => total_blocks,
"ignored" => blocks.len().saturating_sub(blocks_to_import.len()), "ignored" => total_blocks.saturating_sub(blocks_to_import.len()),
); );
} }
@ -87,15 +89,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(0); return Ok(0);
} }
let n_blobs_to_import = blocks_to_import
.iter()
.map(|available_block| available_block.blobs().map_or(0, |blobs| blobs.len()))
.sum::<usize>();
let mut expected_block_root = anchor_info.oldest_block_parent; let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot; let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut chunk_writer = let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?; ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut cold_batch = Vec::with_capacity(blocks.len()); let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
let mut hot_batch = Vec::with_capacity(blocks.len()); let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_to_import);
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
for available_block in blocks_to_import.into_iter().rev() {
let (block, maybe_blobs) = available_block.deconstruct();
for block in blocks_to_import.iter().rev() {
// Check chain integrity. // Check chain integrity.
let block_root = block.canonical_root(); let block_root = block.canonical_root();
@ -107,9 +117,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.into()); .into());
} }
let blinded_block = block.clone_as_blinded();
// Store block in the hot database without payload. // Store block in the hot database without payload.
self.store self.store
.blinded_block_as_kv_store_ops(&block_root, block, &mut hot_batch); .blinded_block_as_kv_store_ops(&block_root, &blinded_block, &mut hot_batch);
// Store the blobs too
if let Some(blobs) = maybe_blobs {
self.store
.blobs_as_kv_store_ops(&block_root, blobs, &mut hot_batch);
}
// Store block roots, including at all skip slots in the freezer DB. // Store block roots, including at all skip slots in the freezer DB.
for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() {
@ -132,8 +148,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
expected_block_root = Hash256::zero(); expected_block_root = Hash256::zero();
break; break;
} }
signed_blocks.push(block);
} }
chunk_writer.write(&mut cold_batch)?; chunk_writer.write(&mut cold_batch)?;
// these were pushed in reverse order so we reverse again
signed_blocks.reverse();
// Verify signatures in one batch, holding the pubkey cache lock for the shortest duration // Verify signatures in one batch, holding the pubkey cache lock for the shortest duration
// possible. For each block fetch the parent root from its successor. Slicing from index 1 // possible. For each block fetch the parent root from its successor. Slicing from index 1
@ -144,13 +163,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.validator_pubkey_cache .validator_pubkey_cache
.try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT) .try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?; .ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?;
let block_roots = blocks_to_import let block_roots = signed_blocks
.get(1..) .get(1..)
.ok_or(HistoricalBlockError::IndexOutOfBounds)? .ok_or(HistoricalBlockError::IndexOutOfBounds)?
.iter() .iter()
.map(|block| block.parent_root()) .map(|block| block.parent_root())
.chain(iter::once(anchor_info.oldest_block_parent)); .chain(iter::once(anchor_info.oldest_block_parent));
let signature_set = blocks_to_import let signature_set = signed_blocks
.iter() .iter()
.zip_eq(block_roots) .zip_eq(block_roots)
.filter_map(|(block, block_root)| { .filter_map(|(block, block_root)| {
@ -207,6 +226,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store_migrator.process_reconstruction(); self.store_migrator.process_reconstruction();
} }
Ok(blocks_to_import.len()) Ok(num_relevant)
} }
} }

View File

@ -8,9 +8,9 @@ use beacon_chain::test_utils::{
}; };
use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD;
use beacon_chain::{ use beacon_chain::{
historical_blocks::HistoricalBlockError, migrate::MigratorConfig, BeaconChain, blob_verification::MaybeAvailableBlock, historical_blocks::HistoricalBlockError,
BeaconChainError, BeaconChainTypes, BeaconSnapshot, ChainConfig, NotifyExecutionLayer, migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot,
ServerSentEventHandler, WhenSlotSkipped, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped,
}; };
use eth2_network_config::TRUSTED_SETUP; use eth2_network_config::TRUSTED_SETUP;
use fork_choice::CountUnrealized; use fork_choice::CountUnrealized;
@ -2209,14 +2209,33 @@ async fn weak_subjectivity_sync() {
.filter(|s| s.beacon_block.slot() != 0) .filter(|s| s.beacon_block.slot() != 0)
.map(|s| s.beacon_block.clone()) .map(|s| s.beacon_block.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut available_blocks = vec![];
for blinded in historical_blocks {
let full_block = harness
.chain
.get_block(&blinded.canonical_root())
.await
.expect("should get block")
.expect("should get block");
if let MaybeAvailableBlock::Available(block) = harness
.chain
.data_availability_checker
.check_availability(full_block.into())
.expect("should check availability")
{
available_blocks.push(block);
}
}
beacon_chain beacon_chain
.import_historical_block_batch(historical_blocks.clone()) .import_historical_block_batch(available_blocks.clone())
.unwrap(); .unwrap();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0); assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);
// Resupplying the blocks should not fail, they can be safely ignored. // Resupplying the blocks should not fail, they can be safely ignored.
beacon_chain beacon_chain
.import_historical_block_batch(historical_blocks) .import_historical_block_batch(available_blocks)
.unwrap(); .unwrap();
// The forwards iterator should now match the original chain // The forwards iterator should now match the original chain

View File

@ -1,8 +1,7 @@
use beacon_chain::store::{metadata::CURRENT_SCHEMA_VERSION, AnchorInfo}; use beacon_chain::store::metadata::CURRENT_SCHEMA_VERSION;
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2::lighthouse::DatabaseInfo; use eth2::lighthouse::DatabaseInfo;
use std::sync::Arc; use std::sync::Arc;
use types::SignedBlindedBeaconBlock;
pub fn info<T: BeaconChainTypes>( pub fn info<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
@ -19,17 +18,3 @@ pub fn info<T: BeaconChainTypes>(
anchor, anchor,
}) })
} }
pub fn historical_blocks<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
blocks: Vec<Arc<SignedBlindedBeaconBlock<T::EthSpec>>>,
) -> Result<AnchorInfo, warp::Rejection> {
chain
.import_historical_block_batch(blocks)
.map_err(warp_utils::reject::beacon_chain_error)?;
let anchor = chain.store.get_anchor_info().ok_or_else(|| {
warp_utils::reject::custom_bad_request("node is not checkpoint synced".to_string())
})?;
Ok(anchor)
}

View File

@ -61,9 +61,9 @@ use types::{
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncCommitteeMessage, SyncContributionData, SyncContributionData,
}; };
use version::{ use version::{
add_consensus_version_header, execution_optimistic_finalized_fork_versioned_response, add_consensus_version_header, execution_optimistic_finalized_fork_versioned_response,
@ -3614,27 +3614,6 @@ pub fn serve<T: BeaconChainTypes>(
}) })
}); });
// POST lighthouse/database/historical_blocks
let post_lighthouse_database_historical_blocks = database_path
.and(warp::path("historical_blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(
|blocks: Vec<Arc<SignedBlindedBeaconBlock<T::EthSpec>>>,
chain: Arc<BeaconChain<T>>,
log: Logger| {
info!(
log,
"Importing historical blocks";
"count" => blocks.len(),
"source" => "http_api"
);
blocking_json_task(move || database::historical_blocks(chain, blocks))
},
);
// GET lighthouse/analysis/block_rewards // GET lighthouse/analysis/block_rewards
let get_lighthouse_block_rewards = warp::path("lighthouse") let get_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis")) .and(warp::path("analysis"))
@ -3905,7 +3884,6 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_validator_register_validator) .uor(post_validator_register_validator)
.uor(post_lighthouse_liveness) .uor(post_lighthouse_liveness)
.uor(post_lighthouse_database_reconstruct) .uor(post_lighthouse_database_reconstruct)
.uor(post_lighthouse_database_historical_blocks)
.uor(post_lighthouse_block_rewards) .uor(post_lighthouse_block_rewards)
.uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_metrics)
.uor(post_lighthouse_ui_validator_info) .uor(post_lighthouse_ui_validator_info)

View File

@ -7,8 +7,9 @@ use crate::beacon_processor::DuplicateCache;
use crate::metrics; use crate::metrics;
use crate::sync::manager::{BlockProcessType, SyncMessage}; use crate::sync::manager::{BlockProcessType, SyncMessage};
use crate::sync::{BatchProcessResult, ChainId}; use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock};
use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::{ use beacon_chain::{
observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
@ -18,10 +19,9 @@ use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized};
use lighthouse_network::PeerAction; use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn}; use slog::{debug, error, info, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock}; use types::{Epoch, Hash256};
/// Id associated to a batch processing request, either a sync batch or a parent lookup. /// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
@ -273,20 +273,19 @@ impl<T: BeaconChainTypes> Worker<T> {
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
let sent_blocks = downloaded_blocks.len(); let sent_blocks = downloaded_blocks.len();
let n_blobs = downloaded_blocks
.iter()
.map(|wrapped| wrapped.n_blobs())
.sum::<usize>();
let unwrapped = downloaded_blocks match self.process_backfill_blocks(downloaded_blocks) {
.into_iter()
//FIXME(sean) handle blobs in backfill
.map(|block| block.block_cloned())
.collect();
match self.process_backfill_blocks(unwrapped) {
(_, Ok(_)) => { (_, Ok(_)) => {
debug!(self.log, "Backfill batch processed"; debug!(self.log, "Backfill batch processed";
"batch_epoch" => epoch, "batch_epoch" => epoch,
"first_block_slot" => start_slot, "first_block_slot" => start_slot,
"last_block_slot" => end_slot, "last_block_slot" => end_slot,
"processed_blocks" => sent_blocks, "processed_blocks" => sent_blocks,
"processed_blobs" => n_blobs,
"service"=> "sync"); "service"=> "sync");
BatchProcessResult::Success { BatchProcessResult::Success {
was_non_empty: sent_blocks > 0, was_non_empty: sent_blocks > 0,
@ -297,6 +296,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"batch_epoch" => epoch, "batch_epoch" => epoch,
"first_block_slot" => start_slot, "first_block_slot" => start_slot,
"last_block_slot" => end_slot, "last_block_slot" => end_slot,
"processed_blobs" => n_blobs,
"error" => %e.message, "error" => %e.message,
"service" => "sync"); "service" => "sync");
match e.peer_action { match e.peer_action {
@ -386,19 +386,67 @@ impl<T: BeaconChainTypes> Worker<T> {
/// Helper function to process backfill block batches which only consumes the chain and blocks to process. /// Helper function to process backfill block batches which only consumes the chain and blocks to process.
fn process_backfill_blocks( fn process_backfill_blocks(
&self, &self,
blocks: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>, downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>,
) -> (usize, Result<(), ChainSegmentFailed>) { ) -> (usize, Result<(), ChainSegmentFailed>) {
let blinded_blocks = blocks let total_blocks = downloaded_blocks.len();
.iter() let available_blocks = match downloaded_blocks
.map(|full_block| full_block.clone_as_blinded()) .into_iter()
.map(Arc::new) .map(|block| {
.collect(); self.chain
match self.chain.import_historical_block_batch(blinded_blocks) { .data_availability_checker
.check_availability(block)
})
.collect::<Result<Vec<_>, _>>()
{
Ok(blocks) => blocks
.into_iter()
.filter_map(|maybe_available| match maybe_available {
MaybeAvailableBlock::Available(block) => Some(block),
MaybeAvailableBlock::AvailabilityPending(_) => None,
})
.collect::<Vec<_>>(),
Err(e) => match e {
AvailabilityCheckError::StoreError(_)
| AvailabilityCheckError::KzgNotInitialized => {
return (
0,
Err(ChainSegmentFailed {
peer_action: None,
message: "Failed to check block availability".into(),
}),
);
}
e => {
return (
0,
Err(ChainSegmentFailed {
peer_action: Some(PeerAction::LowToleranceError),
message: format!("Failed to check block availability : {:?}", e),
}),
)
}
},
};
if available_blocks.len() != total_blocks {
return (
0,
Err(ChainSegmentFailed {
peer_action: Some(PeerAction::LowToleranceError),
message: format!(
"{} out of {} blocks were unavailable",
(total_blocks - available_blocks.len()),
total_blocks
),
}),
);
}
match self.chain.import_historical_block_batch(available_blocks) {
Ok(imported_blocks) => { Ok(imported_blocks) => {
metrics::inc_counter( metrics::inc_counter(
&metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL, &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL,
); );
(imported_blocks, Ok(())) (imported_blocks, Ok(()))
} }
Err(error) => { Err(error) => {